【持续更新】点云Utils工具

2025年4月19日 下午3:08 科研

写在前面

20250423 想给GT点云增加噪声并且随机旋转,然后用PCA方向进行提取三个坐标,最后旋转到一样的形状,有时成功有时失败,代码放在这里,mark一下:https://paste.org.cn/lYYK49ZwIn
最后用SVD分解解决了...

1. 点云PCA提取主方向

主要用于点云对齐,输入:点云;输出:三个正交方向

def get_principle_dirs(pts):

    pts_pca = PCA(n_components=3)
    pts_pca.fit(pts)
    principle_dirs = pts_pca.components_
    principle_dirs /= np.linalg.norm(principle_dirs, 2, axis=0)

    return principle_dirs

2. 点云对齐到标准坐标系

输入:点云

输出:转换后的点云以及逆旋转矩阵

def pca_alignment(pts, random_flag=False):

    pca_dirs = get_principle_dirs(pts) # 获得主方向(特征向量)

    if random_flag:

        pca_dirs *= np.random.choice([-1, 1], 1)

    rotate_1 = compute_roatation_matrix(pca_dirs[2], [0, 0, 1], pca_dirs[1])
    pca_dirs = np.array(rotate_1 * pca_dirs.T).T
    rotate_2 = compute_roatation_matrix(pca_dirs[1], [1, 0, 0], pca_dirs[2])
    pts = np.array(rotate_2 * rotate_1 * np.matrix(pts.T)).T

    inv_rotation = np.array(np.linalg.inv(rotate_2 * rotate_1)) # 逆旋转矩阵

    return pts, inv_rotation

3. 对齐旋转

这里还有两个需要用到的,主要是计算sourdest的旋转矩阵,用于构造对齐变换以及绕轴theta弧度进行旋转

def compute_roatation_matrix(sour_vec, dest_vec, sour_vertical_vec=None):
    # http://immersivemath.com/forum/question/rotation-matrix-from-one-vector-to-another/
    if np.linalg.norm(np.cross(sour_vec, dest_vec), 2)  0 or np.abs(np.dot(sour_vec, dest_vec)) >= 1.0:
        if np.dot(sour_vec, dest_vec) < 0:
            return rotation_matrix(sour_vertical_vec, np.pi)
        return np.identity(3)
    alpha = np.arccos(np.dot(sour_vec, dest_vec))
    a = np.cross(sour_vec, dest_vec) / np.linalg.norm(np.cross(sour_vec, dest_vec), 2)
    c = np.cos(alpha)
    s = np.sin(alpha)
    R1 = [a[0] * a[0] * (1.0 - c) + c,
          a[0] * a[1] * (1.0 - c) - s * a[2],
          a[0] * a[2] * (1.0 - c) + s * a[1]]

    R2 = [a[0] * a[1] * (1.0 - c) + s * a[2],
          a[1] * a[1] * (1.0 - c) + c,
          a[1] * a[2] * (1.0 - c) - s * a[0]]

    R3 = [a[0] * a[2] * (1.0 - c) - s * a[1],
          a[1] * a[2] * (1.0 - c) + s * a[0],
          a[2] * a[2] * (1.0 - c) + c]

    R = np.matrix([R1, R2, R3])

    return R

def rotation_matrix(axis, theta):

    # Return the rotation matrix associated with counterclockwise rotation about the given axis by theta radians.

    axis = np.asarray(axis)
    axis = axis / math.sqrt(np.dot(axis, axis))
    a = math.cos(theta / 2.0)
    b, c, d = -axis * math.sin(theta / 2.0)
    aa, bb, cc, dd = a * a, b * b, c * c, d * d
    bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d
    return np.matrix(np.array([[aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)],
                     [2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)],
                     [2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc]]))

4. 基于 SVD 的点云刚性对齐(Procrustes 分解)

在点云配准(registration)中,我们经常需要将“源”点云(source)刚性地对齐到“目标”点云(target),保证旋转和平移后两者重合。下面介绍一种经典且精确的 Procrustes 分解(或称 SVD 对齐)方法。


假设有两个点云:

  • GT(ground-truth,目标点云)记作 \mathbf{X} = \{\,\mathbf{x}_i\in\mathbb{R}^3\}_{i=1}^N
  • Noisy(源点云)记作 \mathbf{Y} = \{\,\mathbf{y}_i\in\mathbb{R}^3\}_{i=1}^N
    两者之间点的索引一一对应。
  1. 计算质心

    
     \mathbf{\bar x} = \frac{1}{N}\sum_{i=1}^N \mathbf{x}_i,\quad
     \mathbf{\bar y} = \frac{1}{N}\sum_{i=1}^N \mathbf{y}_i.
  2. 中心化

    
     \mathbf{\tilde X} = [\,\mathbf{x}_1 - \mathbf{\bar x},\ \mathbf{x}_2 - \mathbf{\bar x},\ \dots\,],\quad
     \mathbf{\tilde Y} = [\,\mathbf{y}_1 - \mathbf{\bar y},\ \mathbf{y}_2 - \mathbf{\bar y},\ \dots\,].
  3. 构造协方差矩阵

    
     \mathbf{H} = \mathbf{\tilde Y}^\top \,\mathbf{\tilde X}\quad \bigl(3\times 3\bigr).
  4. SVD 分解

    
     \mathbf{H} = \mathbf{U}\,\boldsymbol\Sigma\,\mathbf{V}^\top.
  5. 计算旋转矩阵

     \mathbf{R} = \mathbf{V}\,\mathbf{U}^\top.

    \det(\mathbf{R})<0,说明出现了反射分量,需要把 \mathbf{V} 的最后一列取反后重算:

    
     \mathbf{V}[:,2]\;\leftarrow\;-\,\mathbf{V}[:,2],\quad
     \mathbf{R} \;=\; \mathbf{V}\,\mathbf{U}^\top.
  6. 计算平移向量

    
     \mathbf{t} = \mathbf{\bar x} - \mathbf{R}\,\mathbf{\bar y}.
  7. 对齐变换
    对源点云中的每个点 \mathbf{y}_i 施加刚性变换:

    
     \mathbf{y}_i' = \mathbf{R}\,\mathbf{y}_i + \mathbf{t},
     \quad i = 1,\dots,N.

Python 代码示例

import numpy as np

def align_via_svd(src_pts, dst_pts):
    # 1. 质心
    c_src = src_pts.mean(axis=0)
    c_dst = dst_pts.mean(axis=0)

    # 2. 中心化
    S = src_pts - c_src
    D = dst_pts - c_dst

    # 3. 协方差矩阵
    H = S.T @ D

    # 4. SVD
    U, _, Vt = np.linalg.svd(H)

    # 5. 旋转矩阵
    R = Vt.T @ U.T
    if np.linalg.det(R) < 0:
        Vt[2, :] *= -1
        R = Vt.T @ U.T

    # 6. 平移向量
    t = c_dst - R @ c_src

    # 7. 应用变换
    aligned = (R @ src_pts.T).T + t
    return aligned, R, t

可视化代码:https://paste.org.cn/iPiXMuBnoF

sample_sdf.py

import trimesh
import mesh2sdf
import os
import sys
import ocnn
import numpy as np
import torch
import diso
import mcubes
import traceback
from functools import partial
import torchcumesh2sdf
import multiprocessing as mp
import objaverse.xl as oxl
from tqdm import tqdm
import pandas as pd
import glob
import argparse
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.utils import scale_to_unit_cube

parser = argparse.ArgumentParser()
parser.add_argument('--dataset', default="ShapeNet", type=str)
parser.add_argument('--mode', default="cpu", type=str)
parser.add_argument('--debug', action='store_true')
parser.add_argument('--batch_size', type=int, default=256)
parser.add_argument('--mesh_scale', type=float, default=0.8)
# parser.add_argument('--device_list', type=str, default="0,1,2,3")
parser.add_argument('--size', type=int, default=256)
parser.add_argument('--level', type=float, default=1/256)
parser.add_argument('--band', type=float, default=0.05)
parser.add_argument('--num_samples', type=int, default=200000)
parser.add_argument('--num_processes', type=int, default=32)
parser.add_argument('--depth', type=int, default=8)
parser.add_argument('--full_depth', type=int, default=4)
args = parser.parse_args()

args.size = 2 ** args.depth
args.level = 1 / args.size
if args.debug:
  args.num_processes = 1
else:
  args.num_processes = 32

def check_folder(filenames: list):
  for filename in filenames:
    folder = os.path.dirname(filename)
    if not os.path.exists(folder):
      os.makedirs(folder)

def sample_pts(mesh, filename_pts):
  points, idx = trimesh.sample.sample_surface(mesh, args.num_samples)
  normals = mesh.face_normals[idx]
  np.savez(filename_pts, points=points.astype(np.float16),
          normals=normals.astype(np.float16))
  return {'points': points, 'normals': normals}

def sample_sdf(sdf, filename_out, pts=None):
  # constants
  depth, full_depth = args.depth, args.full_depth
  sample_num = 4  # number of samples in each octree node 也就是文中说的在每个八叉树的节点,采4个点并计算对应的sdf值。
  grid = np.array([[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1],
                  [1, 0, 0], [1, 0, 1], [1, 1, 0], [1, 1, 1]])
  grid = torch.tensor(grid, device=sdf.device)

  points = pts['points'].astype(np.float32)
  normals = pts['normals'].astype(np.float32)

  # build octree 这里构建八叉树时输入的点云是[-1,1]内连续的,也就是说不受分辨率的影响
  points = ocnn.octree.Points(torch.from_numpy(
      points), torch.from_numpy(normals)).to(sdf.device)
  octree = ocnn.octree.Octree(depth=depth, full_depth=full_depth)
  octree.build_octree(points)

  # sample points and grads according to the xyz
  xyzs, grads, sdfs, valids = [], [], [], []
  for d in range(full_depth, depth + 1):
    xyzb = octree.xyzb(d)
    x, y, z, b = xyzb
    xyz = torch.stack((x, y, z), dim=1).float()

    # sample k points in each octree node
    xyz = xyz.unsqueeze(
        1) + torch.rand(xyz.shape[0], sample_num, 3, device=sdf.device)
    xyz = xyz.view(-1, 3)                  # (N, 3)
    # normalize to [0, 2^sdf_depth] 相当于将坐标放大到[0,128],128是sdf采样的分辨率
    xyz = xyz * (args.size / (2 ** d))
    # remove out-of-bound points
    xyz = xyz[(xyz < (args.size - 1)).all(dim=1)]
    xyzs.append(xyz)

    # interpolate the sdf values
    xyzi = torch.floor(xyz)                # the integer part (N, 3)
    corners = xyzi.unsqueeze(1) + grid     # (N, 8, 3)
    coordsf = xyz.unsqueeze(1) - corners   # (N, 8, 3), in [-1.0, 1.0]
    weights = (1 - coordsf.abs()).prod(dim=-1)  # (N, 8, 1)
    corners = corners.long().view(-1, 3)
    x, y, z = corners[:, 0], corners[:, 1], corners[:, 2]
    s = sdf[x, y, z].view(-1, 8)
    sw = torch.sum(s * weights, dim=1)
    sdfs.append(sw)

    # test if sdf is in range
    valid = (s.abs() <= args.band).all(dim=1)
    valids.append(valid)

    # calc the gradient
    gx = s[:, 4] - s[:, 0] + s[:, 5] - s[:, 1] + \
        s[:, 6] - s[:, 2] + s[:, 7] - s[:, 3]  # noqa
    gy = s[:, 2] - s[:, 0] + s[:, 3] - s[:, 1] + \
        s[:, 6] - s[:, 4] + s[:, 7] - s[:, 5]  # noqa
    gz = s[:, 1] - s[:, 0] + s[:, 3] - s[:, 2] + \
        s[:, 5] - s[:, 4] + s[:, 7] - s[:, 6]  # noqa
    grad = torch.stack([gx, gy, gz], dim=-1)
    norm = torch.sqrt(torch.sum(grad ** 2, dim=-1, keepdims=True))
    grad = grad / (norm + 1.0e-8)
    grads.append(grad)

  # concat the results
  xyzs = torch.cat(xyzs, dim=0)
  points = (xyzs / (args.size/2) - 1)
  grads = torch.cat(grads, dim=0)
  sdfs = torch.cat(sdfs, dim=0)
  valids = torch.cat(valids, dim=0)

  # remove invalid points
  if args.mode == "cuda":
    # points = points[valids]
    # grads = grads[valids]
    # sdfs = sdfs[valids]
    sdfs[~valids] = (sdfs[~valids] > 0).float() * args.band
    grads[~valids] = 0.0

  # save results
  random_idx = torch.randperm(points.shape[0])[:min(400000, points.shape[0])]
  points = points[random_idx].cpu().numpy().astype(np.float16)
  grads = grads[random_idx].cpu().numpy().astype(np.float16)
  sdfs = sdfs[random_idx].cpu().numpy().astype(np.float16)
  np.savez(filename_out, points=points, grad=grads, sdf=sdfs)

  # visualize
  if args.debug:
    valids = valids[random_idx].cpu().numpy()
    surf_points = points[valids] - sdfs[valids].reshape(-1, 1) * grads[valids]
    pointcloud = trimesh.PointCloud(surf_points)
    pointcloud.export("mytools/sdf_surf.ply")
    pointcloud = trimesh.PointCloud(points)
    pointcloud.export("mytools/sdf.ply")

def get_sdf(mesh, filename_obj):
  vertices = mesh.vertices
  # run mesh2sdf
  voxel_sdf, mesh_new = mesh2sdf.compute(
      vertices, mesh.faces, args.size, fix=True, level=args.level, return_mesh=True)
  if args.debug:
    mesh_new.export(filename_obj)
  return mesh_new, voxel_sdf

def get_sdf_cu(mesh, device, filename_obj):
  tris = np.array(mesh.triangles, dtype=np.float32, subok=False)
  tris = torch.tensor(tris, dtype=torch.float32).to(device)
  tris = (tris + 1.0) / 2.0
  voxel_sdf = torchcumesh2sdf.get_sdf(tris, args.size, args.band, B=args.batch_size)
  if args.debug:
    vertices, faces = diso.DiffMC().to(device).forward(voxel_sdf, isovalue=args.level)
    mcubes.export_obj(vertices.cpu().numpy(),
                      faces.cpu().numpy(), filename_obj)
  torchcumesh2sdf.free_cached_memory()
  torch.cuda.empty_cache()
  return mesh, voxel_sdf

def process(index, filenames, load_paths, save_paths):
  filename = filenames[index]
  load_path = load_paths[index]
  save_path = save_paths[index]
  if not os.path.exists(load_path):
    print(f"{filename} not exists")
    return

  filename_input = load_path
  filename_obj = os.path.join(save_path, "mesh.obj")
  filename_pointcloud = os.path.join(save_path, 'pointcloud.npz')
  filename_sdf = os.path.join(save_path, 'sdf.npz')
  if os.path.exists(filename_sdf) and os.path.exists(filename_pointcloud):
    print(f"{filename} already exists")
    return

  try:
    mesh = trimesh.load(filename_input, force='mesh')
    mesh = scale_to_unit_cube(mesh)
    mesh.vertices *= args.mesh_scale
  except:
    # os.remove(filename_input)
    print(f"Trimesh load mesh {filename} error")
    return

  check_folder([filename_obj, filename_pointcloud, filename_sdf])
  # device = torch.device(f'cuda:{device_list[index % len(device_list)]}')
  device = torch.device('cuda:0')
  if args.mode == "cuda":
    mesh, voxel_sdf = get_sdf_cu(mesh, device, filename_obj)
  elif args.mode == "cpu":
    mesh, voxel_sdf = get_sdf(mesh, filename_obj)
    voxel_sdf = torch.tensor(voxel_sdf)
  pointcloud = sample_pts(mesh, filename_pointcloud)
  sample_sdf(voxel_sdf, filename_sdf, pointcloud)
  print(f"Mesh {index}/{len(filenames)} {filename} done")

def get_filelist(filelist_path):
  with open(filelist_path, 'r') as f:
    lines = f.readlines()
  lines = [line.strip() for line in lines]
  return lines

def get_shapenet_path():
  load_path = f'data/ShapeNet/ShapeNetCore.v1'
  save_path = f'data/ShapeNet/datasets_256_test'
  existing_files = set(glob.glob(f"{save_path}/*/*.npz"))
  lines = get_filelist('data/ShapeNet/filelist/train_im_5.txt') + get_filelist("data/ShapeNet/filelist/test_im_5.txt")

  load_paths, save_paths, filenames = [], [], []
  for line in lines:
    filename = line
    if not isinstance(filename, str):
      continue
    if os.path.join(save_path, f"{filename}", "sdf.npz") in existing_files and \
       os.path.join(save_path, f"{filename}", "pointcloud.npz") in existing_files:
      continue
    filenames.append(filename)
    load_paths.append(os.path.join(load_path, line, "model.obj"))
    save_paths.append(os.path.join(save_path, f"{filename}"))
  return filenames, load_paths, save_paths

def get_objaverse_path():
  load_path = f'data/Objaverse/raw/glbs'
  save_path = f'data/Objaverse/datasets_512'
  existing_files = set(glob.glob(f"{save_path}/*/*.npz"))
  filelist_path = f'data/Objaverse/filelist/objaverse_5w.txt'
  with open(filelist_path, 'r') as f:
    lines = f.readlines()
  lines = [line.strip() for line in lines]
  load_paths, save_paths, filenames = [], [], []
  for line in lines:
    filename = line
    if not isinstance(filename, str):
      continue
    if os.path.join(save_path, f"{filename}", "sdf.npz") in existing_files and \
       os.path.join(save_path, f"{filename}", "pointcloud.npz") in existing_files:
      continue
    filenames.append(filename)
    load_paths.append(os.path.join(load_path, line + ".glb"))
    save_paths.append(os.path.join(save_path, f"{filename}"))
  return filenames, load_paths, save_paths

if __name__ == "__main__":
  if args.dataset == "ShapeNet":
    filenames, load_paths, save_paths = get_shapenet_path()
  elif args.dataset == "Objaverse":
    filenames, load_paths, save_paths = get_objaverse_path()
  indices = list(range(len(filenames)))
  if args.num_processes > 1:
    func = partial(process, filenames=filenames, load_paths=load_paths, save_paths=save_paths)
    with mp.Pool(processes=args.num_processes) as pool:
      list(tqdm(pool.imap_unordered(func, indices), total=len(filenames)))
  else:
    for i in range(len(filenames)):
      process(i, filenames, load_paths, save_paths)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

  • 分类

  • 归档

  • 页面