Skip to content

Runners

There are two ways to manage the agents in RAI:

  1. AgentRunner - a class for starting and stopping the agents
  2. wait_for_shutdown - a function for waiting for interruption signals

Usage of Runners is optional

You can start and stop the agents manually for more control over the agents lifecycle.

AgentRunner class definition

rai.agents.runner.AgentRunner

Runs the agents in the background.

Parameters:

Name Type Description Default
agents List[BaseAgent]

List of agent instances

required
Source code in src/rai_core/rai/agents/runner.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class AgentRunner:
    """Runs the agents in the background.

    Parameters
    ----------
    agents : List[BaseAgent]
        List of agent instances
    """

    def __init__(self, agents: List[BaseAgent]):
        """Initialize the AgentRunner with a list of agents.

        Parameters
        ----------
        agents : List[BaseAgent]
            List of agent instances to be managed by the runner.
        """
        self.agents = agents
        self.logger = logging.getLogger(__name__)

    def run(self):
        """Run all agents in the background.

        Notes
        -----
        This method starts all agents by calling their `run` method.
        It is experimental; if agents do not run properly, consider running them in separate processes.
        """
        self.logger.info(
            f"{self.__class__.__name__}.{self.run.__name__} is an experimental function. \
                            If you believe that your agents are not running properly, \
                            please run them separately (in different processes)."
        )
        for agent in self.agents:
            agent.run()

    def run_and_wait_for_shutdown(self):
        """Run all agents and block until a shutdown signal is received.

        Notes
        -----
        This method starts all agents and waits for a shutdown signal (SIGINT or SIGTERM), ensuring graceful termination.
        """
        self.run()
        self.wait_for_shutdown()

    def wait_for_shutdown(self):
        """Block until a shutdown signal (SIGINT or SIGTERM) is received, ensuring graceful termination.

        Notes
        -----
        Installs signal handlers to capture SIGINT and SIGTERM. On receiving a signal, stops all managed agents.
        """
        shutdown_event = Event()

        def signal_handler(signum: int, frame: Optional[FrameType]):
            shutdown_event.set()

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        try:
            shutdown_event.wait()
        finally:
            for agent in self.agents:
                agent.stop()

    def stop(self):
        """Stop all managed agents by calling their `stop` method."""
        for agent in self.agents:
            agent.stop()

__init__(agents)

Initialize the AgentRunner with a list of agents.

Parameters:

Name Type Description Default
agents List[BaseAgent]

List of agent instances to be managed by the runner.

required
Source code in src/rai_core/rai/agents/runner.py
79
80
81
82
83
84
85
86
87
88
def __init__(self, agents: List[BaseAgent]):
    """Initialize the AgentRunner with a list of agents.

    Parameters
    ----------
    agents : List[BaseAgent]
        List of agent instances to be managed by the runner.
    """
    self.agents = agents
    self.logger = logging.getLogger(__name__)

run()

Run all agents in the background.

Notes

This method starts all agents by calling their run method. It is experimental; if agents do not run properly, consider running them in separate processes.

Source code in src/rai_core/rai/agents/runner.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def run(self):
    """Run all agents in the background.

    Notes
    -----
    This method starts all agents by calling their `run` method.
    It is experimental; if agents do not run properly, consider running them in separate processes.
    """
    self.logger.info(
        f"{self.__class__.__name__}.{self.run.__name__} is an experimental function. \
                        If you believe that your agents are not running properly, \
                        please run them separately (in different processes)."
    )
    for agent in self.agents:
        agent.run()

run_and_wait_for_shutdown()

Run all agents and block until a shutdown signal is received.

Notes

This method starts all agents and waits for a shutdown signal (SIGINT or SIGTERM), ensuring graceful termination.

Source code in src/rai_core/rai/agents/runner.py
106
107
108
109
110
111
112
113
114
def run_and_wait_for_shutdown(self):
    """Run all agents and block until a shutdown signal is received.

    Notes
    -----
    This method starts all agents and waits for a shutdown signal (SIGINT or SIGTERM), ensuring graceful termination.
    """
    self.run()
    self.wait_for_shutdown()

stop()

Stop all managed agents by calling their stop method.

Source code in src/rai_core/rai/agents/runner.py
137
138
139
140
def stop(self):
    """Stop all managed agents by calling their `stop` method."""
    for agent in self.agents:
        agent.stop()

wait_for_shutdown()

Block until a shutdown signal (SIGINT or SIGTERM) is received, ensuring graceful termination.

Notes

Installs signal handlers to capture SIGINT and SIGTERM. On receiving a signal, stops all managed agents.

Source code in src/rai_core/rai/agents/runner.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def wait_for_shutdown(self):
    """Block until a shutdown signal (SIGINT or SIGTERM) is received, ensuring graceful termination.

    Notes
    -----
    Installs signal handlers to capture SIGINT and SIGTERM. On receiving a signal, stops all managed agents.
    """
    shutdown_event = Event()

    def signal_handler(signum: int, frame: Optional[FrameType]):
        shutdown_event.set()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        shutdown_event.wait()
    finally:
        for agent in self.agents:
            agent.stop()

wait_for_shutdown function definition

rai.agents.runner.wait_for_shutdown(agents)

Block until a shutdown signal (SIGINT or SIGTERM) is received, ensuring graceful termination.

Parameters:

Name Type Description Default
agents List[BaseAgent]

List of running agents to be stopped on shutdown.

required
Notes

This method ensures a graceful shutdown of both the agent and the ROS2 node upon receiving an interrupt signal (SIGINT, e.g., Ctrl+C) or SIGTERM. It installs signal handlers to capture these events and invokes the agent's stop() method as part of the shutdown process.

Source code in src/rai_core/rai/agents/runner.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def wait_for_shutdown(agents: List[BaseAgent]):
    """Block until a shutdown signal (SIGINT or SIGTERM) is received, ensuring graceful termination.

    Parameters
    ----------
    agents : List[BaseAgent]
        List of running agents to be stopped on shutdown.

    Notes
    -----
    This method ensures a graceful shutdown of both the agent and the ROS2 node upon receiving
    an interrupt signal (SIGINT, e.g., Ctrl+C) or SIGTERM. It installs signal handlers to
    capture these events and invokes the agent's ``stop()`` method as part of the shutdown process.
    """
    shutdown_event = Event()

    def signal_handler(signum, frame):
        shutdown_event.set()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        shutdown_event.wait()
    finally:
        for agent in agents:
            agent.stop()

Usage examples

AgentRunner

from rai.agents import AgentRunner
from rai.agents.ros2 import ROS2StateBasedAgent
from rai.agents import ReActAgent

state_based_agent = ROS2StateBasedAgent()
react_agent = ReActAgent()

runner = AgentRunner([state_based_agent, react_agent])

runner.run_and_wait_for_shutdown() # starts the agents and blocks until the shutdown signal (Ctrl+C or SIGTERM)

wait_for_shutdown

from rai.agents import wait_for_shutdown
from rai.agents.ros2 import ROS2StateBasedAgent
from rai.agents import ReActAgent

state_based_agent = ROS2StateBasedAgent()
react_agent = ReActAgent()

# start the agents manually
state_based_agent.run()
react_agent.run()

# blocks until the shutdown signal (Ctrl+C or SIGTERM)
wait_for_shutdown([state_based_agent, react_agent])

See Also

  • Agents: For more information on the different types of agents in RAI
  • Aggregators: For more information on the different types of aggregators in RAI
  • Connectors: For more information on the different types of connectors in RAI
  • Langchain Integration: For more information on the LangChain integration within RAI
  • Multimodal messages: For more information on the multimodal LangChain messages in RAI