"""Collision Rate metric for robotics policy safety evaluation.
Collision Rate quantifies the frequency of collisions during task execution,
serving as a primary safety indicator. This metric is particularly critical
for mobile robots and humanoids operating in human environments where safety
is paramount.
Reference:
M. Hoy, A. S. Matveev, and A. V. Savkin, "Algorithms for collision-free
navigation of mobile robots in complex cluttered environments: a survey,"
Robotica, vol. 33, pp. 463–497, Mar. 2015.
"""
from typing import Any, Callable, Optional
import torch
from torch import Tensor
from robometric_frame.safety.base import BaseSafetyMetric
[docs]
class CollisionRate(BaseSafetyMetric):
r"""Compute Collision Rate for robotics policy safety evaluation.
Collision Rate is calculated as:
.. math::
\text{CR} = \frac{N_{\text{collisions}}}{T_{\text{steps}}}
where :math:`N_{\text{collisions}}` is the total number of collision occurrences and
:math:`T_{\text{steps}}` is the total number of trajectory steps.
This metric uses a user-defined distance function to compute distances
to obstacles, then applies a threshold to detect collisions. A collision
is detected when the distance is less than or equal to the threshold.
Args:
distance_fn: User-defined function that computes distances to obstacles.
Signature: distance_fn(trajectory: Tensor, environment: Any) -> Tensor
- trajectory: Shape (..., L, D) where L is trajectory length, D is spatial dims
- environment: User-defined environment representation (optional)
- Returns: Tensor of shape (..., L) with distances to nearest obstacle
at each trajectory point (positive values)
collision_threshold: Distance threshold for collision detection. Distances
less than or equal to this value are considered collisions. Default: 0.0
**kwargs: Additional keyword arguments passed to the base Metric class.
Example:
>>> from robometric_frame.safety import CollisionRate
>>> import torch
>>> # Define a distance function
>>> def simple_distance_fn(trajectory, environment=None):
... # Distance to walls at ±5
... x_coords = trajectory[..., 0]
... dist_to_walls = torch.minimum(
... torch.abs(x_coords - 5),
... torch.abs(x_coords + 5)
... )
... return dist_to_walls
>>> metric = CollisionRate(distance_fn=simple_distance_fn, collision_threshold=0.5)
>>> # Trajectory with some points close to walls
>>> trajectory = torch.tensor([[0.0, 0.0], [3.0, 0.0], [4.8, 0.0], [1.0, 0.0]])
>>> metric.update(trajectory)
>>> result = metric.compute()
>>> result['collision_rate'].item() # One point at distance 0.2 <= 0.5
0.25
Example (with environment):
>>> # Define distance function with environment obstacles
>>> def obstacle_distance_fn(trajectory, environment):
... # environment is a dict with obstacle positions and radii
... min_distances = torch.full(trajectory.shape[:-1], float('inf'))
... for obs_pos, obs_radius in zip(environment['positions'], environment['radii']):
... # Compute distance to obstacle surface
... distances = torch.norm(trajectory - obs_pos, dim=-1) - obs_radius
... min_distances = torch.minimum(min_distances, distances)
... return torch.clamp(min_distances, min=0.0) # Ensure non-negative
>>> environment = {
... 'positions': [torch.tensor([2.0, 2.0]), torch.tensor([5.0, 5.0])],
... 'radii': [0.5, 0.5]
... }
>>> metric = CollisionRate(distance_fn=obstacle_distance_fn)
>>> trajectory = torch.tensor([[0.0, 0.0], [2.0, 2.0], [3.0, 3.0], [4.0, 4.0]])
>>> metric.update(trajectory, environment=environment)
>>> result = metric.compute()
Example (batched):
>>> # Batch of trajectories
>>> metric = CollisionRate(distance_fn=simple_distance_fn, collision_threshold=0.5)
>>> trajectories = torch.tensor([
... [[0.0, 0.0], [1.0, 0.0], [2.0, 0.0]],
... [[4.7, 0.0], [4.9, 0.0], [5.0, 0.0]]
... ])
>>> metric.update(trajectories)
>>> result = metric.compute()
"""
# Metric states that persist across updates
full_state_update: bool = False
is_differentiable: bool = False
higher_is_better: bool = False
# Dynamically added by add_state() in __init__
total_collisions: Tensor
total_steps: Tensor
[docs]
def __init__(
self,
distance_fn: Callable[[Tensor, Any], Tensor],
collision_threshold: float = 0.0,
**kwargs: Any,
) -> None:
"""Initialize the CollisionRate metric."""
super().__init__(distance_fn=distance_fn, **kwargs)
if collision_threshold < 0:
raise ValueError(f"collision_threshold must be non-negative, got {collision_threshold}")
self.collision_threshold = collision_threshold
# Add metric states for distributed computation
self.add_state("total_collisions", default=torch.tensor(0), dist_reduce_fx="sum")
self.add_state("total_steps", default=torch.tensor(0), dist_reduce_fx="sum")
[docs]
def update( # pylint: disable=arguments-differ
self,
trajectory: Tensor,
environment: Optional[Any] = None,
) -> None:
"""Update metric state with trajectory and collision information.
Args:
trajectory: Trajectory tensor of shape (..., L, D) where:
- ... represents any number of batch dimensions (can be empty)
- L is the number of trajectory points
- D is the spatial dimensionality (typically 2 or 3)
Examples of valid shapes:
- (L, D): Single trajectory
- (B, L, D): Batch of B trajectories
- (B, T, L, D): Batch with time/episode dimension
environment: Optional environment representation passed to distance_fn.
Can be any type (dict, object, tensor, etc.) that the user's
distance function expects.
Raises:
ValueError: If trajectory has invalid shape or distances are negative.
RuntimeError: If distance_fn returns invalid shape or type.
Example:
>>> metric = CollisionRate(distance_fn=my_distance_fn)
>>> trajectory = torch.randn(10, 2) # 10 points in 2D
>>> metric.update(trajectory)
>>> # With environment
>>> metric.update(trajectory, environment={'obstacles': [...]})
"""
# Validate trajectory shape
self._validate_trajectory(trajectory)
# Compute distances using base class method
distances = self._compute_distances(trajectory, environment)
# Detect collisions based on threshold
collisions = self._detect_collisions(distances, self.collision_threshold)
# Count collisions and steps
num_collisions = collisions.sum().long()
num_steps = collisions.numel()
# Update states
self.total_collisions += num_collisions # pylint: disable=no-member
self.total_steps += num_steps # pylint: disable=no-member
[docs]
def compute(self) -> dict[str, Tensor]:
"""Compute collision rate statistics.
Returns:
Dictionary containing:
- 'collision_rate': Ratio of collision steps to total steps
- 'total_collisions': Total number of collision occurrences
- 'total_steps': Total number of trajectory steps
- 'collision_percentage': Collision rate as percentage (0-100)
Raises:
RuntimeError: If no trajectories have been recorded.
Example:
>>> metric = CollisionRate(distance_fn=my_distance_fn)
>>> metric.update(trajectory)
>>> result = metric.compute()
>>> print(f"Collision rate: {result['collision_rate'].item():.2%}")
>>> print(f"Total collisions: {result['total_collisions'].item()}")
"""
if self.total_steps == 0: # pylint: disable=no-member
raise RuntimeError(
"Cannot compute collision rate: no trajectories have been recorded. "
"Call update() with trajectory data before compute()."
)
collision_rate = self.total_collisions.float() / self.total_steps # pylint: disable=no-member
return {
"collision_rate": collision_rate,
"total_collisions": self.total_collisions.float(), # pylint: disable=no-member
"total_steps": self.total_steps.float(), # pylint: disable=no-member
"collision_percentage": collision_rate * 100,
}