Skip to content

TextToSpeechAgent

Overview

The TextToSpeechAgent in the RAI framework is a modular agent responsible for converting incoming text into audio using a text-to-speech (TTS) model and playing it through a configured audio output device. It supports real-time playback control through ROS2 messages and handles asynchronous speech processing using threads and queues.

Class Definition

TextToSpeechAgent class definition

rai_s2s.tts.agents.TextToSpeechAgent

Bases: BaseAgent

Agent responsible for converting text to speech and handling audio playback.

Parameters:

Name Type Description Default
speaker_config SoundDeviceConfig

Configuration for the sound device used for playback.

required
ros2_name str

Name of the ROS2 node.

required
tts TTSModel

Text-to-speech model used for generating audio.

required
logger Optional[Logger]

Logger instance for logging messages, by default None.

None
max_speech_history int

Maximum amount of speech ids to remember, by default 64

64
Source code in rai_s2s/tts/agents/tts_agent.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class TextToSpeechAgent(BaseAgent):
    """
    Agent responsible for converting text to speech and handling audio playback.

    Parameters
    ----------
    speaker_config : SoundDeviceConfig
        Configuration for the sound device used for playback.
    ros2_name : str
        Name of the ROS2 node.
    tts : TTSModel
        Text-to-speech model used for generating audio.
    logger : Optional[logging.Logger], optional
        Logger instance for logging messages, by default None.
    max_speech_history : int, optional
        Maximum amount of speech ids to remember, by default 64
    """

    def __init__(
        self,
        speaker_config: SoundDeviceConfig,
        ros2_name: str,
        tts: TTSModel,
        logger: Optional[logging.Logger] = None,
        max_speech_history=64,
    ):
        if logger is None:
            self.logger = logging.getLogger(__name__)
        else:
            self.logger = logger

        self.speaker = SoundDeviceConnector(
            targets=[("speaker", speaker_config)], sources=[]
        )
        sample_rate, _, out_channels = self.speaker.get_audio_params("speaker")
        tts.sample_rate = sample_rate
        tts.channels = out_channels

        self.node_base_name = ros2_name
        self.model = tts
        self.ros2_connector = self._setup_ros2_connector()
        super().__init__()

        self.current_transcription_id = str(uuid4())[0:8]
        self.current_speech_id = None
        self.text_queues: dict[str, Queue] = {self.current_transcription_id: Queue()}
        self.audio_queues: dict[str, Queue] = {self.current_transcription_id: Queue()}
        self.remembered_speech_ids: list[str] = []

        self.tog_play_event = Event()
        self.stop_event = Event()
        self.current_audio = None

        self.terminate_agent = Event()
        self.transcription_thread = None
        self.running = False

        self.playback_data = PlayData()

    @classmethod
    def from_config(cls, cfg_path: Optional[str] = None) -> Self:
        cfg = load_config(cfg_path)
        config = SoundDeviceConfig(
            stream=True,
            is_output=True,
            device_name=cfg.speaker.device_name,
        )
        match cfg.text_to_speech.model_type:
            case "ElevenLabs":
                from rai_s2s.tts.models import ElevenLabsTTS

                if cfg.text_to_speech.voice != "":
                    model = ElevenLabsTTS(voice=cfg.text_to_speech.voice)
                else:
                    raise ValueError("ElevenLabs [tts] vendor required voice to be set")
            case "OpenTTS":
                from rai_s2s.tts.models import OpenTTS

                if cfg.text_to_speech.voice != "":
                    model = OpenTTS(voice=cfg.text_to_speech.voice)
                else:
                    model = OpenTTS()
            case _:
                raise ValueError(f"Unknown model_type: {cfg.text_to_speech.model_type}")
        return cls(config, "rai_auto_tts", model)

    def __call__(self):
        self.run()

    def run(self):
        """
        Start the text-to-speech agent, initializing playback and launching the transcription thread.
        """
        self.running = True
        self.logger.info("TextToSpeechAgent started")
        self.transcription_thread = Thread(target=self._transcription_thread)
        self.transcription_thread.start()

        msg = SoundDeviceMessage(read=False)
        self.speaker.start_action(
            msg,
            "speaker",
            on_feedback=self._speaker_callback,
            on_done=lambda: None,
        )

    def _speaker_callback(self, outdata, frames, time, status_dict):
        set_flags = [flag for flag, status in status_dict.items() if status]

        if set_flags:
            self.logger.warning("Flags set:" + ", ".join(set_flags))
        if self.playback_data.playing:
            if self.playback_data.current_segment is None:
                try:
                    self.playback_data.current_segment = self.audio_queues[
                        self.current_transcription_id
                    ].get(block=False)
                    self.playback_data.data = np.array(
                        self.playback_data.current_segment.get_array_of_samples()  # type: ignore
                    ).reshape(-1, self.playback_data.channels)
                except Empty:
                    pass
                except KeyError:
                    pass
            if self.playback_data.data is not None:
                current_frame = self.playback_data.current_frame
                chunksize = min(len(self.playback_data.data) - current_frame, frames)
                outdata[:chunksize] = self.playback_data.data[
                    current_frame : current_frame + chunksize
                ]
                if chunksize < frames:
                    outdata[chunksize:] = 0
                    self.playback_data.current_frame = 0
                    self.playback_data.current_segment = None
                    self.playback_data.data = None
                else:
                    self.playback_data.current_frame += chunksize

        if not self.playback_data.playing:
            outdata[:] = np.zeros(outdata.size).reshape(outdata.shape)

    def stop(self):
        """
        Clean exit the text-to-speech agent, terminating playback and joining the transcription thread.
        """
        self.logger.info("Stopping TextToSpeechAgent")
        self.terminate_agent.set()
        if self.transcription_thread is not None:
            self.transcription_thread.join()

    def _transcription_thread(self):
        while not self.terminate_agent.wait(timeout=0.01):
            if self.current_transcription_id in self.text_queues:
                try:
                    data = self.text_queues[self.current_transcription_id].get(
                        block=False
                    )
                except Empty:
                    continue
                audio = self.model.get_speech(data)
                try:
                    self.audio_queues[self.current_transcription_id].put(audio)
                except KeyError as e:
                    self.logger.error(
                        f"Could not find queue for {self.current_transcription_id}: queuse: {self.audio_queues.keys()}"
                    )
                    raise e

    def _setup_ros2_connector(self):
        self.hri_ros2_connector = ROS2HRIConnector(
            self.node_base_name  # , "single_threaded"
        )
        self.hri_ros2_connector.register_callback(
            "/to_human", self._on_to_human_message
        )
        self.ros2_connector = ROS2Connector(
            self.node_base_name  # , False, "single_threaded"
        )
        self.ros2_connector.register_callback(
            "/voice_commands", self._on_command_message, msg_type="std_msgs/msg/String"
        )

    def _on_to_human_message(self, msg: ROS2HRIMessage):
        self.logger.debug(f"Receieved message from human: {msg.text}")
        self.logger.warning(
            f"Starting playback, current id: {self.current_transcription_id}"
        )
        if (
            self.current_speech_id is None
            and msg.communication_id is not None
            and msg.communication_id not in self.remembered_speech_ids
        ):
            self.current_speech_id = msg.communication_id
            self.remembered_speech_ids.append(self.current_speech_id)
            if len(self.remembered_speech_ids) > 64:
                self.remembered_speech_ids.pop(0)
        if self.current_speech_id == msg.communication_id:
            self.text_queues[self.current_transcription_id].put(msg.text)
        self.playback_data.playing = True

    def _on_command_message(self, message: ROS2Message):
        self.logger.info(f"Receieved status message: {message}")
        if message.payload.data == "tog_play":
            self.playback_data.playing = not self.playback_data.playing
        elif message.payload.data == "play":
            self.playback_data.playing = True
        elif message.payload.data == "pause":
            self.playback_data.playing = False
        elif message.payload.data == "stop":
            self.current_speech_id = None
            self.playback_data.playing = False
            previous_id = self.current_transcription_id
            self.logger.warning(f"Stopping playback, previous id: {previous_id}")
            self.current_transcription_id = str(uuid4())[0:8]
            self.audio_queues[self.current_transcription_id] = Queue()
            self.text_queues[self.current_transcription_id] = Queue()
            try:
                del self.audio_queues[previous_id]
                del self.text_queues[previous_id]
            except KeyError:
                pass
            self.playback_data.data = None
            self.playback_data.current_frame = 0
            self.playback_data.current_segment = None

        self.logger.debug(f"Current status is: {self.playback_data.playing}")

run()

Start the text-to-speech agent, initializing playback and launching the transcription thread.

Source code in rai_s2s/tts/agents/tts_agent.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def run(self):
    """
    Start the text-to-speech agent, initializing playback and launching the transcription thread.
    """
    self.running = True
    self.logger.info("TextToSpeechAgent started")
    self.transcription_thread = Thread(target=self._transcription_thread)
    self.transcription_thread.start()

    msg = SoundDeviceMessage(read=False)
    self.speaker.start_action(
        msg,
        "speaker",
        on_feedback=self._speaker_callback,
        on_done=lambda: None,
    )

stop()

Clean exit the text-to-speech agent, terminating playback and joining the transcription thread.

Source code in rai_s2s/tts/agents/tts_agent.py
202
203
204
205
206
207
208
209
def stop(self):
    """
    Clean exit the text-to-speech agent, terminating playback and joining the transcription thread.
    """
    self.logger.info("Stopping TextToSpeechAgent")
    self.terminate_agent.set()
    if self.transcription_thread is not None:
        self.transcription_thread.join()

Purpose

The TextToSpeechAgent enables:

  • Real-time conversion of text to speech
  • Playback control (play/pause/stop) via ROS2 messages
  • Dynamic loading of TTS models from configuration
  • Robust audio handling using queues and event-driven logic
  • Integration with human-robot interaction topics (HRI)

Initialization Parameters

Parameter Type Description
speaker_config SoundDeviceConfig Configuration for the audio output (speaker).
ros2_name str Name of the ROS2 node.
tts TTSModel Text-to-speech model instance.
logger Optional[logging.Logger] Logger instance, or default logger if None.
max_speech_history int Number of speech message IDs to remember (default: 64).

Key Methods

from_config(cfg_path: Optional[str])

Instantiates the agent from a configuration file, dynamically selecting the TTS model and setting up audio output.

run()

Initializes the agent:

  • Starts a thread to handle queued text-to-speech conversion
  • Launches speaker playback via SoundDeviceConnector

stop()

Gracefully stops the agent by setting the termination flag and joining the transcription thread.

Communication

The Agent uses the ROS2HRIConnector for connection through 2 ROS2 topics:

  • /to_human: Incoming text messages to convert. Uses rai_interfaces/msg/HRIMessage.
  • /voice_commands: Playback control with ROS2 std_msgs/msg/String. Valid values: "play", "pause", "stop"

Best Practices

  1. Queue Management: Properly track transcription IDs to avoid queue collisions or memory leaks.
  2. Playback Sync: Ensure audio queues are flushed on stop to avoid replaying outdated speech.
  3. Graceful Shutdown: Always call stop() to terminate threads cleanly.
  4. Model Configuration: Ensure model-specific settings (e.g., voice selection for ElevenLabs) are defined in config files.

Architecture

The TextToSpeechAgent interacts with the following core components:

  • TTSModel: Converts text into audio (e.g., ElevenLabsTTS, OpenTTS)
  • SoundDeviceConnector: Sends synthesized audio to output hardware
  • ROS2HRIConnector: Handles incoming HRI and command messages
  • Queues and Threads: Enable asynchronous and buffered audio processing

See Also