Source code for robometric_frame.trajectory_quality.path_smoothness

"""Path Smoothness metric for robotics policy trajectory evaluation.

Path Smoothness evaluates the rate of change in trajectory direction, detecting
oscillations that may arise from velocity changes or directional adjustments.

Reference:
    M. Dobiš, M. Dekan, P. Beňo, F. Duchoň, and A. Babinec, "Evaluation criteria
    for trajectories of robotic arms," Robotics, vol. 11, p. 29, Feb. 2022.

    S. Guillén Ruiz, L. V. Calderita, A. Hidalgo-Paniagua, and J. P. Bandera Rubio,
    "Measuring smoothness as a factor for efficient and socially accepted robot
    motion," Sensors (Basel), vol. 20, p. 6822, Nov. 2020.
"""

from typing import Any

import torch
from torch import Tensor
from torchmetrics import Metric


[docs] class PathSmoothness(Metric): r"""Compute Path Smoothness for robotics policy trajectory evaluation. Path Smoothness is calculated as: PS = (1/PL) * Σ(i=1 to L-2) \|(p_{i+2} - p_{i+1}) - (p_{i+1} - p_i)\|_2 where p_i are trajectory points in D-dimensional space, L is the length of the trajectory, and PL is the path length. This metric measures the rate of change in trajectory direction, with lower values indicating smoother paths. The metric calculates the difference between consecutive displacement vectors, effectively measuring the second derivative (acceleration) of the path. It is normalized by the total path length to make it scale-invariant. This metric accumulates smoothness values across multiple trajectories and returns the average path smoothness when compute() is called. Args: **kwargs: Additional keyword arguments passed to the base Metric class. Example: >>> from robometric_frame.trajectory_quality import PathSmoothness >>> import torch >>> metric = PathSmoothness() >>> # Smooth straight line (perfect smoothness = 0) >>> trajectory = torch.tensor([ ... [0.0, 0.0], ... [1.0, 0.0], ... [2.0, 0.0], ... [3.0, 0.0] ... ]) >>> metric.update(trajectory) >>> metric.compute() tensor(0.0000) Example (with direction change): >>> # Path with a turn (higher smoothness value) >>> metric = PathSmoothness() >>> trajectory = torch.tensor([ ... [0.0, 0.0], ... [1.0, 0.0], ... [2.0, 0.0], ... [2.0, 1.0] ... ]) >>> metric.update(trajectory) >>> result = metric.compute() >>> result > 0 # Non-zero smoothness due to direction change tensor(True) Example (batched): >>> # Batch of trajectories - shape (B, L, D) >>> metric = PathSmoothness() >>> batch = torch.tensor([ ... [[0.0, 0.0], [1.0, 0.0], [2.0, 0.0], [3.0, 0.0]], # smooth ... [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [2.0, 1.0]] # has turn ... ]) >>> metric.update(batch) >>> result = metric.compute() # Average smoothness Example (3D trajectories): >>> # 3D trajectory >>> metric = PathSmoothness() >>> trajectory_3d = torch.tensor([ ... [0.0, 0.0, 0.0], ... [1.0, 0.0, 0.0], ... [2.0, 0.0, 0.0], ... [3.0, 0.0, 0.0] ... ]) >>> metric.update(trajectory_3d) >>> metric.compute() # Perfect smoothness for straight line tensor(0.0000) Example (distributed): >>> # In distributed training, metrics are automatically synced >>> metric = PathSmoothness() >>> # On GPU 0 >>> traj_gpu0 = torch.tensor([[0.0, 0.0], [1.0, 0.0], [2.0, 0.0]]) >>> metric.update(traj_gpu0) >>> # On GPU 1 >>> traj_gpu1 = torch.tensor([[0.0, 0.0], [0.0, 1.0], [0.0, 2.0]]) >>> metric.update(traj_gpu1) >>> # Final result aggregates across all GPUs >>> result = metric.compute() # Returns aggregated average smoothness """ # Metric states that persist across updates full_state_update: bool = False # Dynamically added by add_state() in __init__ total_smoothness: Tensor num_trajectories: Tensor
[docs] def __init__( self, **kwargs: Any, ) -> None: """Initialize the PathSmoothness metric.""" super().__init__(**kwargs) # Add metric states for distributed computation self.add_state("total_smoothness", default=torch.tensor(0.0), dist_reduce_fx="sum") self.add_state("num_trajectories", default=torch.tensor(0), dist_reduce_fx="sum")
[docs] def update(self, trajectory: Tensor) -> None: # pylint: disable=arguments-differ """Update metric state with new trajectory or batch of trajectories. Args: trajectory: Tensor of shape (..., L, D) where: - ... represents any number of batch dimensions (can be empty) - L is the number of points (must be >= 3) - D is the spatial dimensionality (e.g., 2 for 2D, 3 for 3D) Examples of valid shapes: - (L, D): Single trajectory - (B, L, D): Batch of B trajectories - (B, T, L, D): Batch of B sequences with T slices each Points should be ordered chronologically along the L dimension. Raises: ValueError: If trajectory has invalid shape or insufficient points. """ if trajectory.ndim < 2: raise ValueError( f"Trajectory must have at least 2 dimensions (..., L, D), " f"got {trajectory.ndim}D tensor with shape {trajectory.shape}" ) num_points = trajectory.shape[-2] # L is the second-to-last dimension if num_points < 3: raise ValueError( f"Trajectory must have at least 3 points along dimension -2 " f"to compute smoothness, got {num_points} point(s)" ) # Convert to float for numerical operations trajectory = trajectory.float() # Calculate displacement vectors between consecutive points # Shape: (..., L-1, D) displacements = trajectory[..., 1:, :] - trajectory[..., :-1, :] # Calculate the difference between consecutive displacement vectors # This measures the change in direction (second derivative) # Shape: (..., L-2, D) direction_changes = displacements[..., 1:, :] - displacements[..., :-1, :] # Calculate Euclidean distances (L2 norm) along the D dimension # Shape: (..., L-2) change_magnitudes = torch.norm(direction_changes, p=2, dim=-1) # Sum along the L-2 dimension to get total direction change # Shape: (...) total_change = change_magnitudes.sum(dim=-1) # Calculate path lengths for normalization # Shape: (..., L-1) segment_lengths = torch.norm(displacements, p=2, dim=-1) # Shape: (...) path_lengths = segment_lengths.sum(dim=-1) # Normalize by path length to get smoothness metric # Add epsilon to avoid division by zero for degenerate trajectories eps = torch.finfo(trajectory.dtype).eps smoothness_values = total_change / (path_lengths + eps) # Count total number of trajectories (product of all batch dimensions) num_trajectories = smoothness_values.numel() # Update states self.total_smoothness += smoothness_values.sum() # pylint: disable=no-member self.num_trajectories += num_trajectories # pylint: disable=no-member
[docs] def compute(self) -> Tensor: """Compute the average Path Smoothness across all trajectories. Returns: Average path smoothness as a scalar tensor. Lower values indicate smoother trajectories with less direction changes. Raises: RuntimeError: If no trajectories have been recorded. """ if self.num_trajectories == 0: # pylint: disable=no-member raise RuntimeError( "Cannot compute path smoothness: no trajectories have been recorded. " "Call update() with trajectory data before compute()." ) return self.total_smoothness / self.num_trajectories # pylint: disable=no-member