|
|
import os |
|
|
os.environ["OPENCV_IO_ENABLE_OPENEXR"]="1" |
|
|
import argparse |
|
|
import numpy as np |
|
|
import torch |
|
|
import glob |
|
|
from scipy.spatial.transform import Rotation |
|
|
import sys |
|
|
from PIL import Image |
|
|
import cv2 |
|
|
import json |
|
|
|
|
|
|
|
|
original_cwd = os.getcwd() |
|
|
vggt_dir = os.path.join(original_cwd, 'vggt') |
|
|
try: |
|
|
os.chdir(vggt_dir) |
|
|
if vggt_dir not in sys.path: |
|
|
sys.path.insert(0, vggt_dir) |
|
|
|
|
|
from vggt.models.vggt import VGGT |
|
|
from vggt.utils.load_fn import load_and_preprocess_images |
|
|
from vggt.utils.pose_enc import pose_encoding_to_extri_intri |
|
|
from vggt.utils.geometry import unproject_depth_map_to_point_map |
|
|
finally: |
|
|
os.chdir(original_cwd) |
|
|
|
|
|
|
|
|
def process_images_with_vggt(info, image_names, model, device): |
|
|
original_images, original_width, original_height = info |
|
|
|
|
|
images = load_and_preprocess_images(image_names).to(device) |
|
|
|
|
|
|
|
|
dtype = torch.bfloat16 if torch.cuda.get_device_capability()[0] >= 8 else torch.float16 |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
with torch.cuda.amp.autocast(dtype=dtype): |
|
|
predictions = model(images) |
|
|
|
|
|
|
|
|
extrinsic, intrinsic = pose_encoding_to_extri_intri(predictions["pose_enc"], images.shape[-2:]) |
|
|
predictions["extrinsic"] = extrinsic |
|
|
predictions["intrinsic"] = intrinsic |
|
|
|
|
|
|
|
|
for key in predictions.keys(): |
|
|
if isinstance(predictions[key], torch.Tensor): |
|
|
predictions[key] = predictions[key].cpu().numpy().squeeze(0) |
|
|
|
|
|
|
|
|
depth_map = predictions["depth"] |
|
|
world_points = unproject_depth_map_to_point_map(depth_map, predictions["extrinsic"], predictions["intrinsic"]) |
|
|
predictions["world_points_from_depth"] = world_points |
|
|
|
|
|
|
|
|
predictions["original_images"] = original_images |
|
|
|
|
|
|
|
|
S, H, W = world_points.shape[:3] |
|
|
normalized_images = np.zeros((S, H, W, 3), dtype=np.float32) |
|
|
|
|
|
for i, img in enumerate(original_images): |
|
|
resized_img = cv2.resize(img, (W, H)) |
|
|
normalized_images[i] = resized_img / 255.0 |
|
|
|
|
|
predictions["images"] = normalized_images |
|
|
predictions["original_width"] = original_width |
|
|
predictions["original_height"] = original_height |
|
|
|
|
|
return predictions, image_names |
|
|
|
|
|
|
|
|
def process_images(image_dir, model, device): |
|
|
""" |
|
|
Process images with VGGT model to extract pose, depth, and camera parameters. |
|
|
|
|
|
Args: |
|
|
image_dir (str): Directory containing input images |
|
|
model: VGGT model instance |
|
|
device: PyTorch device (CPU/GPU) |
|
|
|
|
|
Returns: |
|
|
tuple: (predictions dict, image_names list) |
|
|
""" |
|
|
|
|
|
image_names = glob.glob(os.path.join(image_dir, "*")) |
|
|
image_names = sorted([f for f in image_names if f.lower().endswith(('.png', '.jpg', '.jpeg'))]) |
|
|
|
|
|
|
|
|
if len(image_names) > 400: |
|
|
image_names = image_names[:400] |
|
|
|
|
|
if len(image_names) == 0: |
|
|
raise ValueError(f"No images found in {image_dir}") |
|
|
|
|
|
|
|
|
original_images = [] |
|
|
original_width = None |
|
|
original_height = None |
|
|
|
|
|
|
|
|
first_image = Image.open(image_names[0]) |
|
|
original_width, original_height = first_image.size |
|
|
|
|
|
|
|
|
for img_path in image_names: |
|
|
img = Image.open(img_path).convert('RGB') |
|
|
original_images.append(np.array(img)) |
|
|
|
|
|
return process_images_with_vggt((original_images, original_width, original_height), image_names, model, device) |
|
|
|
|
|
|
|
|
def extrinsic_to_colmap_format(extrinsics): |
|
|
""" |
|
|
Convert extrinsic matrices from VGGT format to COLMAP format. |
|
|
|
|
|
VGGT uses camera-to-world transformation matrices (R|t), |
|
|
while COLMAP uses quaternion + translation format. |
|
|
|
|
|
Args: |
|
|
extrinsics (np.ndarray): Extrinsic matrices in shape (N, 4, 4) |
|
|
|
|
|
Returns: |
|
|
tuple: (quaternions array, translations array) |
|
|
""" |
|
|
num_cameras = extrinsics.shape[0] |
|
|
quaternions = [] |
|
|
translations = [] |
|
|
|
|
|
for i in range(num_cameras): |
|
|
|
|
|
|
|
|
R = extrinsics[i, :3, :3] |
|
|
t = extrinsics[i, :3, 3] |
|
|
|
|
|
|
|
|
|
|
|
rot = Rotation.from_matrix(R) |
|
|
quat = rot.as_quat() |
|
|
quat = np.array([quat[3], quat[0], quat[1], quat[2]]) |
|
|
|
|
|
quaternions.append(quat) |
|
|
translations.append(t) |
|
|
|
|
|
return np.array(quaternions), np.array(translations) |
|
|
|
|
|
def ToR(q): |
|
|
""" |
|
|
Convert quaternion to rotation matrix. |
|
|
|
|
|
Args: |
|
|
q (np.ndarray): Quaternion in [w, x, y, z] format |
|
|
|
|
|
Returns: |
|
|
np.ndarray: 3x3 rotation matrix |
|
|
""" |
|
|
return np.eye(3) + 2 * np.array(( |
|
|
(-q[2] * q[2] - q[3] * q[3], |
|
|
q[1] * q[2] - q[3] * q[0], |
|
|
q[1] * q[3] + q[2] * q[0]), |
|
|
( q[1] * q[2] + q[3] * q[0], |
|
|
-q[1] * q[1] - q[3] * q[3], |
|
|
q[2] * q[3] - q[1] * q[0]), |
|
|
( q[1] * q[3] - q[2] * q[0], |
|
|
q[2] * q[3] + q[1] * q[0], |
|
|
-q[1] * q[1] - q[2] * q[2]))) |
|
|
|
|
|
def main(image_dir, output_dir): |
|
|
""" |
|
|
Main function to process images with VGGT and save results in COLMAP format. |
|
|
|
|
|
Args: |
|
|
image_dir (str): Directory containing input images |
|
|
output_dir (str): Directory to save output files |
|
|
""" |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
os.makedirs(os.path.join(output_dir, 'depth'), exist_ok=True) |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
model = VGGT.from_pretrained("facebook/VGGT-1B").to(device) |
|
|
model.eval() |
|
|
|
|
|
|
|
|
predictions, image_names = process_images(image_dir, model, device) |
|
|
|
|
|
|
|
|
quaternions, translations = extrinsic_to_colmap_format(predictions["extrinsic"]) |
|
|
|
|
|
save_dict = {} |
|
|
|
|
|
|
|
|
depth = predictions["depth"] |
|
|
intrinsic = predictions["intrinsic"] |
|
|
height, width = predictions["depth"].shape[1:3] |
|
|
ori_height, ori_width = predictions["original_height"], predictions["original_width"] |
|
|
|
|
|
|
|
|
s_height, s_width = ori_height / height, ori_width / width |
|
|
|
|
|
|
|
|
for i, (image_name, depth, intrinsic, quaternion, translation) \ |
|
|
in enumerate(zip(image_names, depth, intrinsic, quaternions, translations)): |
|
|
|
|
|
qw, qx, qy, qz = quaternion |
|
|
rot = ToR(np.array([qw, qx, qy, qz])) |
|
|
trans = translation.reshape(3,1) |
|
|
|
|
|
|
|
|
bottom = np.array([[0, 0, 0, 1]]) |
|
|
w2c = np.concatenate([np.concatenate([rot, trans], 1), bottom], axis=0) |
|
|
|
|
|
|
|
|
intrinsic[0, :] = intrinsic[0, :] * s_width |
|
|
intrinsic[1, :] = intrinsic[1, :] * s_height |
|
|
|
|
|
|
|
|
cv2.imwrite(os.path.join(output_dir, 'depth', f"frame_{(i+1):05d}.exr"), depth, \ |
|
|
[cv2.IMWRITE_EXR_TYPE, cv2.IMWRITE_EXR_TYPE_FLOAT]) |
|
|
|
|
|
|
|
|
save_dict[f"{(i+1):03d}"] = { |
|
|
'image_path': image_name, |
|
|
'depth_path': os.path.join(output_dir, 'depth', f"frame_{(i+1):05d}.exr"), |
|
|
'intrinsic': intrinsic.tolist(), |
|
|
'w2c': w2c.tolist() |
|
|
} |
|
|
|
|
|
|
|
|
with open(os.path.join(output_dir, "colmap_data.json"), "w") as f: |
|
|
json.dump(save_dict, f, indent=2, sort_keys=True) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser(description="Run VGGT data engine.") |
|
|
parser.add_argument('--image_dir', type=str, required=True, help='Path to input images directory') |
|
|
parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory') |
|
|
args = parser.parse_args() |
|
|
main(args.image_dir, args.output_dir) |
|
|
|