Skip to content

RAI Sim

RAI Sim is a package that provides an interface for connecting with various simulation environments. It is designed to be simulator-agnostic, allowing RAI to work with any simulation environment that implements the required interface.

Core Components

SimulationBridge

The SimulationBridge is an abstract base class that defines the interface for communicating with different simulation environments. It provides the following key functionalities:

  • Scene setup and management
  • Entity spawning and despawning
  • Object pose retrieval
  • Scene state monitoring

SceneConfig

The SceneConfig is a configuration class that specifies the entities to be spawned in the simulation.

SimulationConfig

The SimulationConfig is an abstract configuration class. Each simulation bridge can extend this with additional parameters specific to its implementation.

SceneState

The SceneState class maintains information about the current state of the simulation scene, including:

  • List of currently spawned entities
  • Current poses of all entities
  • Entity tracking and management

Implementation Details

Entity Management

The package provides two main entity classes:

  • Entity: Represents an entity that can be spawned in the simulation
  • SpawnedEntity: Represents an entity that has been successfully spawned

Tools

RAI Sim includes utility tools for working with simulations:

  • GetObjectPositionsGroundTruthTool: Retrieves accurate positional data for objects in the simulation

Usage

To use RAI Sim with a specific simulation environment:

  1. Create a custom SimulationBridge implementation for your simulator
  2. Extend SimulationConfig with simulator-specific parameters
  3. Implement the required abstract methods:
    • init_simulation
    • setup_scene
    • _spawn_entity
    • _despawn_entity
    • get_object_pose
    • get_scene_state

Configuration

Simulation configurations are typically loaded from YAML files with the following structure:

frame_id: <reference_frame>
entities:
    - name: <unique_entity_name>
      prefab_name: <resource_name>
      pose:
          translation:
              x: <x_coordinate>
              y: <y_coordinate>
              z: <z_coordinate>
          rotation:
              x: <x_rotation>
              y: <y_rotation>
              z: <z_rotation>
              w: <w_rotation>

Error Handling

The package includes comprehensive error handling for:

  • Duplicate entity names
  • Failed entity spawning/despawning
  • Invalid configurations
  • Simulation process management

Integration with RAI Bench

RAI Sim serves as the foundation for RAI Bench by providing:

  • A consistent interface for all simulation environments
  • Entity management and tracking
  • Scene state monitoring
  • Configuration management

This allows RAI Bench to focus on task definition and evaluation while remaining simulator-agnostic.

LaunchManager

RAI Sim also provides a ROS2LaunchManager class that manages the start and shutdown of ROS 2LaunchDescription

ROS2LaunchManager class definition

rai_sim.launch_manager.ROS2LaunchManager

Source code in rai_sim/launch_manager.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ROS2LaunchManager:
    def __init__(self) -> None:
        self._stop_event: Optional[Event] = None
        self._process: Optional[multiprocessing.Process] = None

    def start(self, launch_description: LaunchDescription) -> None:
        self._stop_event = multiprocessing.Event()
        self._process = multiprocessing.Process(
            target=self._run_process,
            args=(self._stop_event, launch_description),
            daemon=True,
        )
        self._process.start()

    def shutdown(self) -> None:
        if self._stop_event:
            self._stop_event.set()
        if self._process:
            self._process.join()

    def _run_process(
        self, stop_event: Event, launch_description: LaunchDescription
    ) -> None:
        loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)
        launch_service = LaunchService()
        launch_service.include_launch_description(launch_description)
        # launch description launched
        launch_task = loop.create_task(launch_service.run_async())
        # when stop event set
        loop.run_until_complete(loop.run_in_executor(None, stop_event.wait))
        if not launch_task.done():
            # XXX (jmatejcz) the shutdown function sends shutdown signal to all
            # nodes launch with launch description which should do the trick
            # but some nodes are stubborn and there is a possibility
            # that they don't close. If this will happen sending PKILL for all
            # ros nodes will be needed
            shutdown_task = loop.create_task(
                launch_service.shutdown(),
            )
            # shutdown task should complete when all nodes are closed
            # but wait also for launch task to close just to be sure
            loop.run_until_complete(asyncio.gather(shutdown_task, launch_task))