"""
1. Fix a tensor contiguous issue in curobo 0.7.4
2. Speed up CudaRobotGenerator with a large number of spheres without collision checking
Maybe consider submitting a PR to curobo to fix the issue.
"""
from typing import Tuple
import torch
from curobo.cuda_robot_model.cuda_robot_generator import CudaRobotGenerator, CudaRobotGeneratorConfig
from curobo.cuda_robot_model.cuda_robot_model import CudaRobotModel, CudaRobotModelConfig
from curobo.curobolib.kinematics import KinematicsFusedFunction
from curobo.util.logger import log_error, log_info, log_warn
from torch import Tensor
[docs]
class CustomKinematicsFusedFunction(KinematicsFusedFunction):
@staticmethod
[docs]
def backward(ctx, grad_out_link_pos, grad_out_link_quat, grad_out_spheres):
return KinematicsFusedFunction.backward(
ctx,
grad_out_link_pos.contiguous(), # NOTE a fix here
grad_out_link_quat.contiguous(), # NOTE a fix here
grad_out_spheres,
)
[docs]
def custom_get_cuda_kinematics(
link_pos_seq,
link_quat_seq,
batch_robot_spheres,
global_cumul_mat,
q_in,
fixed_transform,
link_spheres_tensor,
link_map, # tells which link is attached to which link i
joint_map, # tells which joint is attached to a link i
joint_map_type, # joint type
store_link_map,
link_sphere_idx_map, # sphere idx map
link_chain_map,
joint_offset_map,
grad_out_q,
use_global_cumul: bool = True,
):
# if not q_in.is_contiguous():
# q_in = q_in.contiguous()
link_pos, link_quat, robot_spheres = CustomKinematicsFusedFunction.apply( # type: ignore
link_pos_seq,
link_quat_seq,
batch_robot_spheres,
global_cumul_mat,
q_in,
fixed_transform,
link_spheres_tensor,
link_map, # tells which link is attached to which link i
joint_map, # tells which joint is attached to a link i
joint_map_type, # joint type
store_link_map,
link_sphere_idx_map, # sphere idx map
link_chain_map,
joint_offset_map,
grad_out_q,
use_global_cumul,
)
return link_pos, link_quat, robot_spheres
[docs]
class CustomCudaRobotGenerator(CudaRobotGenerator):
def _create_self_collision_thread_data( # type: ignore
self, collision_threshold: torch.Tensor
) -> Tuple[torch.Tensor, int, bool, int]:
"""Create thread data for self collision checks.
Args:
collision_threshold: Collision distance between spheres of the robot. Used to
skip self collision checks when distance is -inf.
Returns:
Tuple[torch.Tensor, int, bool, int]: Thread location for self collision checks,
number of self collision checks, if thread calculation was successful,
and number of checks per thread.
"""
coll_cpu = collision_threshold.cpu()
max_checks_per_thread = 512
thread_loc = torch.zeros((2 * 32 * max_checks_per_thread), dtype=torch.int16) - 1
n_spheres = coll_cpu.shape[0]
sl_idx = 0
skip_count = 0
all_val = 0
valid_data = True
for i in range(n_spheres):
if not valid_data:
break
if torch.max(coll_cpu[i]) == -torch.inf:
log_info("skip" + str(i))
skip_count += n_spheres - i
all_val += n_spheres - i
continue # NOTE a speed up here
for j in range(i + 1, n_spheres):
if sl_idx > thread_loc.shape[0] - 1:
valid_data = False
log_warn(
"Self Collision checks are greater than "
+ str(32 * max_checks_per_thread)
+ ", using slower kernel"
)
break
if coll_cpu[i, j] != -torch.inf:
thread_loc[sl_idx] = i
sl_idx += 1
thread_loc[sl_idx] = j
sl_idx += 1
else:
skip_count += 1
all_val += 1
log_info("Self Collision threads, skipped %: " + str(100 * float(skip_count) / all_val))
log_info("Self Collision count: " + str(sl_idx / (2)))
log_info("Self Collision per thread: " + str(sl_idx / (2 * 1024)))
max_checks_per_thread = 512
val = sl_idx / (2 * 1024)
if val < 1:
max_checks_per_thread = 1
elif val < 2:
max_checks_per_thread = 2
elif val < 4:
max_checks_per_thread = 4
elif val < 8:
max_checks_per_thread = 8
elif val < 32:
max_checks_per_thread = 32
elif val < 64:
max_checks_per_thread = 64
elif val < 128:
max_checks_per_thread = 128
elif val < 512:
max_checks_per_thread = 512
else:
log_error(
"Self Collision not supported as checks are greater than 32 * 512, \
reduce number of spheres used to approximate the robot."
)
if max_checks_per_thread < 2:
max_checks_per_thread = 2
log_info("Self Collision using: " + str(max_checks_per_thread))
return (
thread_loc.to(device=collision_threshold.device),
sl_idx,
valid_data,
max_checks_per_thread,
)
[docs]
class CustomCudaRobotModelConfig(CudaRobotModelConfig):
@staticmethod
[docs]
def from_config(config: CudaRobotGeneratorConfig) -> CudaRobotModelConfig:
"""Create a robot model configuration from a generator configuration.
Args:
config: Input robot generator configuration.
Returns:
CudaRobotModelConfig: robot model configuration.
"""
# create a config generator and load all values
generator = CustomCudaRobotGenerator(config)
return CudaRobotModelConfig(
tensor_args=generator.tensor_args,
link_names=generator.link_names, # type: ignore
kinematics_config=generator.kinematics_config,
self_collision_config=generator.self_collision_config,
kinematics_parser=generator.kinematics_parser,
use_global_cumul=generator.use_global_cumul,
compute_jacobian=generator.compute_jacobian,
generator_config=config,
)
[docs]
class CustomCudaRobotModel(CudaRobotModel):
def _cuda_forward(self, q: torch.Tensor) -> Tuple[Tensor, Tensor, Tensor]:
link_pos, link_quat, robot_spheres = custom_get_cuda_kinematics(
self._link_pos_seq,
self._link_quat_seq,
self._batch_robot_spheres,
self._global_cumul_mat,
q,
self.kinematics_config.fixed_transforms,
self.kinematics_config.link_spheres,
self.kinematics_config.link_map, # tells which link is attached to which link i
self.kinematics_config.joint_map, # tells which joint is attached to a link i
self.kinematics_config.joint_map_type, # joint type
self.kinematics_config.store_link_map,
self.kinematics_config.link_sphere_idx_map, # sphere idx map
self.kinematics_config.link_chain_map,
self.kinematics_config.joint_offset_map,
self._grad_out_q,
self.use_global_cumul,
)
return link_pos, link_quat, robot_spheres