Problems with multiple tracking and glue obj to chest
Hello, thanks for your project, i have problem with show multiple obj's on screen and correct set obj on image depending of human chest. More than one obj not showing at all. About obj location it's near chest, but when human farther from camera obj throw away from human chest.
Code romp/main.py: ` import argparse import os import os.path as osp import sys
import cv2 import numpy as np import torch from scipy.spatial.transform import Rotation from torch import nn from vis_human import setup_renderer, rendering_romp_bev_results
from .model import ROMPv1
from .post_parser import CenterMap, SMPL_parser, body_mesh_projection2image, parsing_outputs
from .utils import transform_rot_representation, img_preprocess, create_OneEuroFilter, euclidean_distance, check_filter_state,
time_cost, download_model, determine_device, ResultSaver, WebcamVideoStream, convert_cam_to_3d_trans,
wait_func, collect_frame_path, progress_bar, get_tracked_ids, smooth_results, convert_tensor2numpy,
save_video_results
def romp_settings(input_args=sys.argv[1:]): parser = argparse.ArgumentParser(description = 'ROMP: Monocular, One-stage, Regression of Multiple 3D People') parser.add_argument('-m', '--mode', type=str, default='image', help = 'Inferece mode, including image, video, webcam') parser.add_argument('-i', '--input', type=str, default=None, help = 'Path to the input image / video') parser.add_argument('-o', '--save_path', type=str, default=osp.join(osp.expanduser("~"),'ROMP_results'), help = 'Path to save the results') parser.add_argument('--GPU', type=int, default=0, help = 'The gpu device number to run the inference on. If GPU=-1, then running in cpu mode') parser.add_argument('--onnx', action='store_true', help = 'Whether to use ONNX for acceleration.')
parser.add_argument('-t', '--temporal_optimize', action='store_true', help = 'Whether to use OneEuro filter to smooth the results')
parser.add_argument('--center_thresh', type=float, default=0.25, help = 'The confidence threshold of positive detection in 2D human body center heatmap.')
parser.add_argument('--show_largest', action='store_true', help = 'Whether to show the largest person only')
parser.add_argument('-sc','--smooth_coeff', type=float, default=3., help = 'The smoothness coeff of OneEuro filter, the smaller, the smoother.')
parser.add_argument('--calc_smpl', action='store_false', help = 'Whether to calculate the smpl mesh from estimated SMPL parameters')
parser.add_argument('--render_mesh', action='store_true', help = 'Whether to render the estimated 3D mesh mesh to image')
parser.add_argument('--renderer', type=str, default='sim3dr', help = 'Choose the renderer for visualizaiton: pyrender (great but slow), sim3dr (fine but fast)')
parser.add_argument('--show', action='store_true', help = 'Whether to show the rendered results')
parser.add_argument('--show_items', type=str, default='mesh', help = 'The items to visualized, including mesh,pj2d,j3d,mesh_bird_view,mesh_side_view,center_conf. splited with ,')
parser.add_argument('--save_video', action='store_true', help = 'Whether to save the video results')
parser.add_argument('--frame_rate', type=int, default=24, help = 'The frame_rate of saved video results')
parser.add_argument('--smpl_path', type=str, default=osp.join(osp.expanduser("~"),'.romp','SMPL_NEUTRAL.pth'), help = 'The path of smpl model file')
parser.add_argument('--obj_path', type=str, default=osp.join(osp.expanduser("~"), '.romp', 'cube3d.obj'), help='The path of the SMPL .obj model file')
parser.add_argument('--mtl_path', type=str, default=osp.join(osp.expanduser("~"), '.romp', 'mtl_path.mtl'), help='The path of the SMPL .mtl texture file')
parser.add_argument('--model_path', type=str, default=osp.join(osp.expanduser("~"),'.romp','ROMP.pkl'), help = 'The path of ROMP checkpoint')
parser.add_argument('--model_onnx_path', type=str, default=osp.join(osp.expanduser("~"),'.romp','ROMP.onnx'), help = 'The path of ROMP onnx checkpoint')
parser.add_argument('--root_align',type=bool, default=False, help = 'Please set this config as True to use the ROMP checkpoints trained by yourself.')
parser.add_argument('--webcam_id',type=int, default=0, help = 'The Webcam ID.')
args = parser.parse_args(input_args)
if not torch.cuda.is_available():
args.GPU = -1
args.temporal_optimize = False
if args.show:
args.render_mesh = True
if args.render_mesh or args.show_largest:
args.calc_smpl = True
if not os.path.exists(args.smpl_path):
if os.path.exists(args.smpl_path.replace('SMPL_NEUTRAL.pth', 'smpl_packed_info.pth')):
args.smpl_path = args.smpl_path.replace('SMPL_NEUTRAL.pth', 'smpl_packed_info.pth')
print(
'please prepare SMPL model files following instructions at https://github.com/Arthur151/ROMP/blob/master/simple_romp/README.md#installation')
if not os.path.exists(args.model_path):
romp_url = 'https://github.com/Arthur151/ROMP/releases/download/V2.0/ROMP.pkl'
download_model(romp_url, args.model_path, 'ROMP')
if not os.path.exists(args.model_onnx_path) and args.onnx:
romp_onnx_url = 'https://github.com/Arthur151/ROMP/releases/download/V2.0/ROMP.onnx'
download_model(romp_onnx_url, args.model_onnx_path, 'ROMP')
return args
default_settings = romp_settings(input_args=[])
class ROMP(nn.Module): def init(self, romp_settings): super(ROMP, self).init() self.settings = romp_settings self.tdevice = determine_device(self.settings.GPU) self.build_model() self.initilization() self.verts, self.faces = create_mesh_from_obj(obj_path=self.settings.obj_path) self.materials = load_mtl(mtl_path=self.settings.mtl_path)
def _build_model_(self):
if not self.settings.onnx:
model = ROMPv1().eval()
model.load_state_dict(torch.load(self.settings.model_path, map_location=self.tdevice))
model = model.to(self.tdevice)
self.model = nn.DataParallel(model)
else:
try:
import onnxruntime
except:
print(
'To use onnx model, we need to install the onnxruntime python package. Please install it by youself if failed!')
if not torch.cuda.is_available():
os.system('pip install onnxruntime')
else:
os.system('pip install onnxruntime-gpu')
import onnxruntime
print('creating onnx model')
self.ort_session = onnxruntime.InferenceSession(self.settings.model_onnx_path, \
providers=['TensorrtExecutionProvider',
'CUDAExecutionProvider', 'CPUExecutionProvider'])
print('created!')
def _initilization_(self):
self.centermap_parser = CenterMap(conf_thresh=self.settings.center_thresh)
if self.settings.calc_smpl:
self.smpl_parser = SMPL_parser(self.settings.smpl_path).to(self.tdevice)
if self.settings.temporal_optimize:
self._initialize_optimization_tools_()
if self.settings.render_mesh:
self.visualize_items = self.settings.show_items.split(',')
self.renderer = setup_renderer(name=self.settings.renderer)
def single_image_forward(self, image):
input_image, image_pad_info = img_preprocess(image)
if self.settings.onnx:
center_maps, params_maps = self.ort_session.run(None, {'image':input_image.numpy().astype(np.float32)})
center_maps, params_maps = torch.from_numpy(center_maps).to(self.tdevice), torch.from_numpy(params_maps).to(self.tdevice)
else:
center_maps, params_maps = self.model(input_image.to(self.tdevice))
params_maps[:, 0] = torch.pow(1.1, params_maps[:, 0])
parsed_results = parsing_outputs(center_maps, params_maps, self.centermap_parser)
return parsed_results, image_pad_info
def _initialize_optimization_tools_(self):
self.OE_filters = {}
if not self.settings.show_largest:
try:
from norfair import Tracker
except:
print('To perform temporal optimization, installing norfair for tracking.')
os.system('pip install norfair')
from norfair import Tracker
self.tracker = Tracker(distance_function=euclidean_distance, distance_threshold=200) # 120
self.tracker_initialized = False
def temporal_optimization(self, outputs, signal_ID):
check_filter_state(self.OE_filters, signal_ID, self.settings.show_largest, self.settings.smooth_coeff)
if self.settings.show_largest:
max_id = torch.argmax(outputs['cam'][:, 0])
outputs['smpl_thetas'], outputs['smpl_betas'], outputs['cam'] = \
smooth_results(self.OE_filters[signal_ID], \
outputs['smpl_thetas'][max_id], outputs['smpl_betas'][max_id], outputs['cam'][max_id])
outputs['smpl_thetas'], outputs['smpl_betas'], outputs['cam'] = outputs['smpl_thetas'].unsqueeze(0), \
outputs['smpl_betas'].unsqueeze(0), outputs['cam'].unsqueeze(0)
else:
pred_cams = outputs['cam']
from norfair import Detection
detections = [Detection(points=cam[[2, 1]] * 512) for cam in pred_cams.cpu().numpy()]
if not self.tracker_initialized:
for _ in range(8):
tracked_objects = self.tracker.update(detections=detections)
tracked_objects = self.tracker.update(detections=detections)
if len(tracked_objects) == 0:
return outputs
tracked_ids = get_tracked_ids(detections, tracked_objects)
for ind, tid in enumerate(tracked_ids):
if tid not in self.OE_filters[signal_ID]:
self.OE_filters[signal_ID][tid] = create_OneEuroFilter(self.settings.smooth_coeff)
outputs['smpl_thetas'][ind], outputs['smpl_betas'][ind], outputs['cam'][ind] = \
smooth_results(self.OE_filters[signal_ID][tid], \
outputs['smpl_thetas'][ind], outputs['smpl_betas'][ind], outputs['cam'][ind])
outputs['track_ids'] = np.array(tracked_ids).astype(np.int32)
return outputs
def get_chest_position_and_orientation(self, outputs):
# Calculate chest position as the middle point between left and right shoulders
shoulder_left = outputs['joints'][0, 5]
shoulder_right = outputs['joints'][0, 2]
chest_position = (shoulder_left + shoulder_right) / 2
return chest_position
def attach_obj_to_chest(self, outputs, obj_verts, obj_faces):
chest_position = self.get_chest_position_and_orientation(outputs)
obj_verts_transformed = obj_verts + chest_position
# Replace the original vertices with the transformed ones
outputs['verts'] = obj_verts_transformed
outputs['faces'] = obj_faces
return outputs
def combine_rendered_results(self, outputs_list):
combined_outputs = {}
# Combine the remaining parameters by stacking
for key in outputs_list[0].keys():
if isinstance(outputs_list[0][key], torch.Tensor):
combined_outputs[key] = torch.cat([outputs[key] for outputs in outputs_list], dim=0)
else:
combined_outputs[key] = np.stack([outputs[key] for outputs in outputs_list], axis=0)
return combined_outputs
@time_cost('ROMP')
def forward(self, image, signal_ID=0, **kwargs):
outputs, image_pad_info = self.single_image_forward(image)
if outputs is None:
return None
if self.settings.temporal_optimize:
outputs = self.temporal_optimization(outputs, signal_ID)
outputs['cam_trans'] = convert_cam_to_3d_trans(outputs['cam'])
if self.settings.calc_smpl:
outputs = self.smpl_parser(outputs, root_align=self.settings.root_align)
outputs.update(body_mesh_projection2image(outputs['joints'], outputs['cam'], vertices=outputs['verts'],
input2org_offsets=image_pad_info))
if self.settings.render_mesh:
rendering_cfgs = {'mesh_color': 'identity', 'items': self.visualize_items,
'renderer': self.settings.renderer} # 'identity'
rendered_results = []
for i in range(len(outputs['cam_trans'])):
outputs= self.attach_obj_to_chest(outputs, self.verts, self.faces)
cam_trans = outputs['cam_trans'][i].detach().cpu().numpy()
depth_scaling_factor_x = 0.05 # Example scaling factor for the X-axis
depth_scaling_factor_y = 0.05 # Example scaling factor for the Y-axis
depth = cam_trans[2]
offset_x = depth_scaling_factor_x * depth + 0.75
offset_y = depth_scaling_factor_y * depth - 0.1
cam_trans[0] += offset_x
cam_trans[1] += offset_y
smpl_thetas = outputs['smpl_thetas'][i, :3]
smpl_thetas *= -1
rot = transform_rot_representation(smpl_thetas.detach().cpu().numpy(), input_type='vec', out_type='mat')
scaled_rot = rot.copy()
scaled_rot[0, 0] /= depth
scaled_rot[1, 1] /= depth
scaled_rot[2, 2] /= depth
scaled_rot = torch.from_numpy(scaled_rot).float()
outputs['smpl_face'] = outputs['faces']
outputs['verts_camed_org'] = (torch.matmul(outputs['verts'], scaled_rot) + cam_trans)[None].repeat(
len(outputs['verts_camed_org']), 1, 1) * 1000
rendered_results.append(outputs)
outputs = self.combine_rendered_results(rendered_results)
print("pre rendering outputs %s" % outputs)
outputs = rendering_romp_bev_results(self.renderer, outputs, image, rendering_cfgs)
print("outputs %s" % outputs)
if self.settings.show:
cv2.imshow('rendered', outputs['rendered_image'])
wait_func(self.settings.mode)
return convert_tensor2numpy(outputs)
def create_mesh_from_obj(obj_path): with open(obj_path, 'r') as f: lines = f.readlines()
verts = []
faces = []
for line in lines:
if line.startswith('v '):
vertex = line.split()[1:]
vertex = list(map(float, vertex))
verts.append(vertex)
elif line.startswith('f '):
face = line.split()[1:]
face_verts = []
for i in range(1, len(face) - 1):
idx1 = face[0].split('/')
idx2 = face[i].split('/')
idx3 = face[i + 1].split('/')
vertex_idx1 = int(idx1[0]) - 1
vertex_idx2 = int(idx2[0]) - 1
vertex_idx3 = int(idx3[0]) - 1
face_verts.append((vertex_idx1, vertex_idx2, vertex_idx3))
faces.extend(face_verts)
verts = np.array(verts, dtype=np.float32)
faces = np.array(faces, dtype=np.int32)
verts, faces = torch.from_numpy(verts), torch.from_numpy(faces)
return verts, faces
def load_mtl(mtl_path): materials = {} with open(mtl_path, 'r') as file: lines = file.readlines()
for line in lines:
components = line.strip().split()
if not components:
continue
if components[0] == 'newmtl':
material_name = components[1]
materials[material_name] = {}
elif components[0] in ['Ka', 'Kd', 'Ks']:
# Ka, Kd, and Ks are the ambient, diffuse, and specular color coefficients
# They are each followed by three floats representing red, green, and blue
assert len(components[1:]) == 3
materials[material_name][components[0]] = list(map(float, components[1:]))
return materials
def main(): args = romp_settings() romp = ROMP(args) if args.mode == 'image': saver = ResultSaver(args.mode, args.save_path) image = cv2.imread(args.input) outputs = romp(image) saver(outputs, args.input)
if args.mode == 'video':
frame_paths, video_save_path = collect_frame_path(args.input, args.save_path)
saver = ResultSaver(args.mode, args.save_path)
for frame_path in progress_bar(frame_paths):
image = cv2.imread(frame_path)
outputs = romp(image)
saver(outputs, frame_path)
save_video_results(saver.frame_save_paths)
if args.save_video:
saver.save_video(video_save_path, frame_rate=args.frame_rate)
if args.mode == 'webcam':
cap = WebcamVideoStream(args.webcam_id)
cap.start()
while True:
frame = cap.read()
outputs = romp(frame)
cap.stop()
if name == 'main': main() `
@Arthur151 Can you help pls