写在前面
20250423 想给GT点云增加噪声并且随机旋转,然后用PCA方向进行提取三个坐标,最后旋转到一样的形状,有时成功有时失败,代码放在这里,mark一下:https://paste.org.cn/lYYK49ZwIn
最后用SVD分解解决了...
1. 点云PCA提取主方向
主要用于点云对齐,输入:点云;输出:三个正交方向
def get_principle_dirs(pts):
pts_pca = PCA(n_components=3)
pts_pca.fit(pts)
principle_dirs = pts_pca.components_
principle_dirs /= np.linalg.norm(principle_dirs, 2, axis=0)
return principle_dirs
2. 点云对齐到标准坐标系
输入:点云
输出:转换后的点云以及逆旋转矩阵
def pca_alignment(pts, random_flag=False):
pca_dirs = get_principle_dirs(pts) # 获得主方向(特征向量)
if random_flag:
pca_dirs *= np.random.choice([-1, 1], 1)
rotate_1 = compute_roatation_matrix(pca_dirs[2], [0, 0, 1], pca_dirs[1])
pca_dirs = np.array(rotate_1 * pca_dirs.T).T
rotate_2 = compute_roatation_matrix(pca_dirs[1], [1, 0, 0], pca_dirs[2])
pts = np.array(rotate_2 * rotate_1 * np.matrix(pts.T)).T
inv_rotation = np.array(np.linalg.inv(rotate_2 * rotate_1)) # 逆旋转矩阵
return pts, inv_rotation
3. 对齐旋转
这里还有两个需要用到的,主要是计算sour
到dest
的旋转矩阵,用于构造对齐变换以及绕轴theta
弧度进行旋转
def compute_roatation_matrix(sour_vec, dest_vec, sour_vertical_vec=None):
# http://immersivemath.com/forum/question/rotation-matrix-from-one-vector-to-another/
if np.linalg.norm(np.cross(sour_vec, dest_vec), 2) 0 or np.abs(np.dot(sour_vec, dest_vec)) >= 1.0:
if np.dot(sour_vec, dest_vec) < 0:
return rotation_matrix(sour_vertical_vec, np.pi)
return np.identity(3)
alpha = np.arccos(np.dot(sour_vec, dest_vec))
a = np.cross(sour_vec, dest_vec) / np.linalg.norm(np.cross(sour_vec, dest_vec), 2)
c = np.cos(alpha)
s = np.sin(alpha)
R1 = [a[0] * a[0] * (1.0 - c) + c,
a[0] * a[1] * (1.0 - c) - s * a[2],
a[0] * a[2] * (1.0 - c) + s * a[1]]
R2 = [a[0] * a[1] * (1.0 - c) + s * a[2],
a[1] * a[1] * (1.0 - c) + c,
a[1] * a[2] * (1.0 - c) - s * a[0]]
R3 = [a[0] * a[2] * (1.0 - c) - s * a[1],
a[1] * a[2] * (1.0 - c) + s * a[0],
a[2] * a[2] * (1.0 - c) + c]
R = np.matrix([R1, R2, R3])
return R
def rotation_matrix(axis, theta):
# Return the rotation matrix associated with counterclockwise rotation about the given axis by theta radians.
axis = np.asarray(axis)
axis = axis / math.sqrt(np.dot(axis, axis))
a = math.cos(theta / 2.0)
b, c, d = -axis * math.sin(theta / 2.0)
aa, bb, cc, dd = a * a, b * b, c * c, d * d
bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d
return np.matrix(np.array([[aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)],
[2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)],
[2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc]]))
4. 基于 SVD 的点云刚性对齐(Procrustes 分解)
在点云配准(registration)中,我们经常需要将“源”点云(source)刚性地对齐到“目标”点云(target),保证旋转和平移后两者重合。下面介绍一种经典且精确的 Procrustes 分解(或称 SVD 对齐)方法。
假设有两个点云:
- GT(ground-truth,目标点云)记作
\mathbf{X} = \{\,\mathbf{x}_i\in\mathbb{R}^3\}_{i=1}^N
, - Noisy(源点云)记作
\mathbf{Y} = \{\,\mathbf{y}_i\in\mathbb{R}^3\}_{i=1}^N
,
两者之间点的索引一一对应。
-
计算质心
\mathbf{\bar x} = \frac{1}{N}\sum_{i=1}^N \mathbf{x}_i,\quad \mathbf{\bar y} = \frac{1}{N}\sum_{i=1}^N \mathbf{y}_i.
-
中心化
\mathbf{\tilde X} = [\,\mathbf{x}_1 - \mathbf{\bar x},\ \mathbf{x}_2 - \mathbf{\bar x},\ \dots\,],\quad \mathbf{\tilde Y} = [\,\mathbf{y}_1 - \mathbf{\bar y},\ \mathbf{y}_2 - \mathbf{\bar y},\ \dots\,].
-
构造协方差矩阵
\mathbf{H} = \mathbf{\tilde Y}^\top \,\mathbf{\tilde X}\quad \bigl(3\times 3\bigr).
-
SVD 分解
\mathbf{H} = \mathbf{U}\,\boldsymbol\Sigma\,\mathbf{V}^\top.
-
计算旋转矩阵
\mathbf{R} = \mathbf{V}\,\mathbf{U}^\top.
若
\det(\mathbf{R})<0
,说明出现了反射分量,需要把\mathbf{V}
的最后一列取反后重算:\mathbf{V}[:,2]\;\leftarrow\;-\,\mathbf{V}[:,2],\quad \mathbf{R} \;=\; \mathbf{V}\,\mathbf{U}^\top.
-
计算平移向量
\mathbf{t} = \mathbf{\bar x} - \mathbf{R}\,\mathbf{\bar y}.
-
对齐变换
对源点云中的每个点\mathbf{y}_i
施加刚性变换:\mathbf{y}_i' = \mathbf{R}\,\mathbf{y}_i + \mathbf{t}, \quad i = 1,\dots,N.
Python 代码示例
import numpy as np
def align_via_svd(src_pts, dst_pts):
# 1. 质心
c_src = src_pts.mean(axis=0)
c_dst = dst_pts.mean(axis=0)
# 2. 中心化
S = src_pts - c_src
D = dst_pts - c_dst
# 3. 协方差矩阵
H = S.T @ D
# 4. SVD
U, _, Vt = np.linalg.svd(H)
# 5. 旋转矩阵
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[2, :] *= -1
R = Vt.T @ U.T
# 6. 平移向量
t = c_dst - R @ c_src
# 7. 应用变换
aligned = (R @ src_pts.T).T + t
return aligned, R, t
可视化代码:https://paste.org.cn/iPiXMuBnoF
sample_sdf.py
import trimesh
import mesh2sdf
import os
import sys
import ocnn
import numpy as np
import torch
import diso
import mcubes
import traceback
from functools import partial
import torchcumesh2sdf
import multiprocessing as mp
import objaverse.xl as oxl
from tqdm import tqdm
import pandas as pd
import glob
import argparse
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.utils import scale_to_unit_cube
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', default="ShapeNet", type=str)
parser.add_argument('--mode', default="cpu", type=str)
parser.add_argument('--debug', action='store_true')
parser.add_argument('--batch_size', type=int, default=256)
parser.add_argument('--mesh_scale', type=float, default=0.8)
# parser.add_argument('--device_list', type=str, default="0,1,2,3")
parser.add_argument('--size', type=int, default=256)
parser.add_argument('--level', type=float, default=1/256)
parser.add_argument('--band', type=float, default=0.05)
parser.add_argument('--num_samples', type=int, default=200000)
parser.add_argument('--num_processes', type=int, default=32)
parser.add_argument('--depth', type=int, default=8)
parser.add_argument('--full_depth', type=int, default=4)
args = parser.parse_args()
args.size = 2 ** args.depth
args.level = 1 / args.size
if args.debug:
args.num_processes = 1
else:
args.num_processes = 32
def check_folder(filenames: list):
for filename in filenames:
folder = os.path.dirname(filename)
if not os.path.exists(folder):
os.makedirs(folder)
def sample_pts(mesh, filename_pts):
points, idx = trimesh.sample.sample_surface(mesh, args.num_samples)
normals = mesh.face_normals[idx]
np.savez(filename_pts, points=points.astype(np.float16),
normals=normals.astype(np.float16))
return {'points': points, 'normals': normals}
def sample_sdf(sdf, filename_out, pts=None):
# constants
depth, full_depth = args.depth, args.full_depth
sample_num = 4 # number of samples in each octree node 也就是文中说的在每个八叉树的节点,采4个点并计算对应的sdf值。
grid = np.array([[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1],
[1, 0, 0], [1, 0, 1], [1, 1, 0], [1, 1, 1]])
grid = torch.tensor(grid, device=sdf.device)
points = pts['points'].astype(np.float32)
normals = pts['normals'].astype(np.float32)
# build octree 这里构建八叉树时输入的点云是[-1,1]内连续的,也就是说不受分辨率的影响
points = ocnn.octree.Points(torch.from_numpy(
points), torch.from_numpy(normals)).to(sdf.device)
octree = ocnn.octree.Octree(depth=depth, full_depth=full_depth)
octree.build_octree(points)
# sample points and grads according to the xyz
xyzs, grads, sdfs, valids = [], [], [], []
for d in range(full_depth, depth + 1):
xyzb = octree.xyzb(d)
x, y, z, b = xyzb
xyz = torch.stack((x, y, z), dim=1).float()
# sample k points in each octree node
xyz = xyz.unsqueeze(
1) + torch.rand(xyz.shape[0], sample_num, 3, device=sdf.device)
xyz = xyz.view(-1, 3) # (N, 3)
# normalize to [0, 2^sdf_depth] 相当于将坐标放大到[0,128],128是sdf采样的分辨率
xyz = xyz * (args.size / (2 ** d))
# remove out-of-bound points
xyz = xyz[(xyz < (args.size - 1)).all(dim=1)]
xyzs.append(xyz)
# interpolate the sdf values
xyzi = torch.floor(xyz) # the integer part (N, 3)
corners = xyzi.unsqueeze(1) + grid # (N, 8, 3)
coordsf = xyz.unsqueeze(1) - corners # (N, 8, 3), in [-1.0, 1.0]
weights = (1 - coordsf.abs()).prod(dim=-1) # (N, 8, 1)
corners = corners.long().view(-1, 3)
x, y, z = corners[:, 0], corners[:, 1], corners[:, 2]
s = sdf[x, y, z].view(-1, 8)
sw = torch.sum(s * weights, dim=1)
sdfs.append(sw)
# test if sdf is in range
valid = (s.abs() <= args.band).all(dim=1)
valids.append(valid)
# calc the gradient
gx = s[:, 4] - s[:, 0] + s[:, 5] - s[:, 1] + \
s[:, 6] - s[:, 2] + s[:, 7] - s[:, 3] # noqa
gy = s[:, 2] - s[:, 0] + s[:, 3] - s[:, 1] + \
s[:, 6] - s[:, 4] + s[:, 7] - s[:, 5] # noqa
gz = s[:, 1] - s[:, 0] + s[:, 3] - s[:, 2] + \
s[:, 5] - s[:, 4] + s[:, 7] - s[:, 6] # noqa
grad = torch.stack([gx, gy, gz], dim=-1)
norm = torch.sqrt(torch.sum(grad ** 2, dim=-1, keepdims=True))
grad = grad / (norm + 1.0e-8)
grads.append(grad)
# concat the results
xyzs = torch.cat(xyzs, dim=0)
points = (xyzs / (args.size/2) - 1)
grads = torch.cat(grads, dim=0)
sdfs = torch.cat(sdfs, dim=0)
valids = torch.cat(valids, dim=0)
# remove invalid points
if args.mode == "cuda":
# points = points[valids]
# grads = grads[valids]
# sdfs = sdfs[valids]
sdfs[~valids] = (sdfs[~valids] > 0).float() * args.band
grads[~valids] = 0.0
# save results
random_idx = torch.randperm(points.shape[0])[:min(400000, points.shape[0])]
points = points[random_idx].cpu().numpy().astype(np.float16)
grads = grads[random_idx].cpu().numpy().astype(np.float16)
sdfs = sdfs[random_idx].cpu().numpy().astype(np.float16)
np.savez(filename_out, points=points, grad=grads, sdf=sdfs)
# visualize
if args.debug:
valids = valids[random_idx].cpu().numpy()
surf_points = points[valids] - sdfs[valids].reshape(-1, 1) * grads[valids]
pointcloud = trimesh.PointCloud(surf_points)
pointcloud.export("mytools/sdf_surf.ply")
pointcloud = trimesh.PointCloud(points)
pointcloud.export("mytools/sdf.ply")
def get_sdf(mesh, filename_obj):
vertices = mesh.vertices
# run mesh2sdf
voxel_sdf, mesh_new = mesh2sdf.compute(
vertices, mesh.faces, args.size, fix=True, level=args.level, return_mesh=True)
if args.debug:
mesh_new.export(filename_obj)
return mesh_new, voxel_sdf
def get_sdf_cu(mesh, device, filename_obj):
tris = np.array(mesh.triangles, dtype=np.float32, subok=False)
tris = torch.tensor(tris, dtype=torch.float32).to(device)
tris = (tris + 1.0) / 2.0
voxel_sdf = torchcumesh2sdf.get_sdf(tris, args.size, args.band, B=args.batch_size)
if args.debug:
vertices, faces = diso.DiffMC().to(device).forward(voxel_sdf, isovalue=args.level)
mcubes.export_obj(vertices.cpu().numpy(),
faces.cpu().numpy(), filename_obj)
torchcumesh2sdf.free_cached_memory()
torch.cuda.empty_cache()
return mesh, voxel_sdf
def process(index, filenames, load_paths, save_paths):
filename = filenames[index]
load_path = load_paths[index]
save_path = save_paths[index]
if not os.path.exists(load_path):
print(f"{filename} not exists")
return
filename_input = load_path
filename_obj = os.path.join(save_path, "mesh.obj")
filename_pointcloud = os.path.join(save_path, 'pointcloud.npz')
filename_sdf = os.path.join(save_path, 'sdf.npz')
if os.path.exists(filename_sdf) and os.path.exists(filename_pointcloud):
print(f"{filename} already exists")
return
try:
mesh = trimesh.load(filename_input, force='mesh')
mesh = scale_to_unit_cube(mesh)
mesh.vertices *= args.mesh_scale
except:
# os.remove(filename_input)
print(f"Trimesh load mesh {filename} error")
return
check_folder([filename_obj, filename_pointcloud, filename_sdf])
# device = torch.device(f'cuda:{device_list[index % len(device_list)]}')
device = torch.device('cuda:0')
if args.mode == "cuda":
mesh, voxel_sdf = get_sdf_cu(mesh, device, filename_obj)
elif args.mode == "cpu":
mesh, voxel_sdf = get_sdf(mesh, filename_obj)
voxel_sdf = torch.tensor(voxel_sdf)
pointcloud = sample_pts(mesh, filename_pointcloud)
sample_sdf(voxel_sdf, filename_sdf, pointcloud)
print(f"Mesh {index}/{len(filenames)} {filename} done")
def get_filelist(filelist_path):
with open(filelist_path, 'r') as f:
lines = f.readlines()
lines = [line.strip() for line in lines]
return lines
def get_shapenet_path():
load_path = f'data/ShapeNet/ShapeNetCore.v1'
save_path = f'data/ShapeNet/datasets_256_test'
existing_files = set(glob.glob(f"{save_path}/*/*.npz"))
lines = get_filelist('data/ShapeNet/filelist/train_im_5.txt') + get_filelist("data/ShapeNet/filelist/test_im_5.txt")
load_paths, save_paths, filenames = [], [], []
for line in lines:
filename = line
if not isinstance(filename, str):
continue
if os.path.join(save_path, f"{filename}", "sdf.npz") in existing_files and \
os.path.join(save_path, f"{filename}", "pointcloud.npz") in existing_files:
continue
filenames.append(filename)
load_paths.append(os.path.join(load_path, line, "model.obj"))
save_paths.append(os.path.join(save_path, f"{filename}"))
return filenames, load_paths, save_paths
def get_objaverse_path():
load_path = f'data/Objaverse/raw/glbs'
save_path = f'data/Objaverse/datasets_512'
existing_files = set(glob.glob(f"{save_path}/*/*.npz"))
filelist_path = f'data/Objaverse/filelist/objaverse_5w.txt'
with open(filelist_path, 'r') as f:
lines = f.readlines()
lines = [line.strip() for line in lines]
load_paths, save_paths, filenames = [], [], []
for line in lines:
filename = line
if not isinstance(filename, str):
continue
if os.path.join(save_path, f"{filename}", "sdf.npz") in existing_files and \
os.path.join(save_path, f"{filename}", "pointcloud.npz") in existing_files:
continue
filenames.append(filename)
load_paths.append(os.path.join(load_path, line + ".glb"))
save_paths.append(os.path.join(save_path, f"{filename}"))
return filenames, load_paths, save_paths
if __name__ == "__main__":
if args.dataset == "ShapeNet":
filenames, load_paths, save_paths = get_shapenet_path()
elif args.dataset == "Objaverse":
filenames, load_paths, save_paths = get_objaverse_path()
indices = list(range(len(filenames)))
if args.num_processes > 1:
func = partial(process, filenames=filenames, load_paths=load_paths, save_paths=save_paths)
with mp.Pool(processes=args.num_processes) as pool:
list(tqdm(pool.imap_unordered(func, indices), total=len(filenames)))
else:
for i in range(len(filenames)):
process(i, filenames, load_paths, save_paths)