pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

Daily transcription opens multiple websockets instead of reusing the existing after calling capture_participant_transcription

Open mwood23 opened this issue 8 months ago • 3 comments

pipecat version

0.0.67

Python version

3.11.11

Operating System

mac

Issue description

When a bot joins a Daily room with transcription_enabled=True and later calls update_transcription() without an instanceId, Daily creates a second Deepgram WebSocket instead of updating the already-running one. Because both WebSockets receive the same mixed audio, each utterance is returned twice and downstream applications receive duplicate transcription messages.

This happens becauseupdate_transcription is not called with the instance_id from the start_transcription call.

I see two paths forward:

  1. Cache the instance_id and add it if it's not provided:
diff --git forkSrcPrefix/src/pipecat/transports/services/daily.py forkDstPrefix/src/pipecat/transports/services/daily.py
index 3e43ddee12201c34a04fcc73b424db5cfaee0833..83e86b60cb6ac2b5c254089ef18e229485f891ca 100644
--- forkSrcPrefix/src/pipecat/transports/services/daily.py
+++ forkDstPrefix/src/pipecat/transports/services/daily.py
@@ -290,6 +290,7 @@ class DailyTransportClient(EventHandler):
         self._video_renderers = {}
         self._transcription_ids = []
         self._transcription_status = None
+        self._transcription_instance_id: Optional[str] = None
 
         self._joining = False
         self._joined = False
@@ -754,6 +755,17 @@ class DailyTransportClient(EventHandler):
         await future
 
     async def update_transcription(self, participants=None, instance_id=None):
+        """Update the running transcription instance.
+
+        If *instance_id* is omitted we automatically use the one that was
+        returned by Daily when the transcription was first started.  This
+        guarantees that we update the existing Deepgram WebSocket instead
+        of creating a new one.
+        """
+
+        if instance_id is None:
+            instance_id = self._transcription_instance_id
+
         future = self._get_event_loop().create_future()
         self._client.update_transcription(
             participants, instance_id, completion=completion_callback(future)
@@ -837,6 +849,12 @@ class DailyTransportClient(EventHandler):
     def on_transcription_started(self, status):
         logger.debug(f"Transcription started: {status}")
         self._transcription_status = status
+        # Save instanceId so we can update this exact transcription
+        # session later.
+        try:
+            self._transcription_instance_id = status.get("instanceId")
+        except Exception:
+            self._transcription_instance_id = None
         self._call_async_callback(self.update_transcription, self._transcription_ids)
 
     def on_transcription_stopped(self, stopped_by, stopped_by_error):

  1. Delay start_transcription until you know all participant ids.

I'm rolling with option one in the interim. I caught this because I transcript and persist messages and all of the user's messages was getting duplicate texted. I also think the duplicate text flows into the LLM.

Reproduction steps

I want to say this example: examples/chatbot-audio-recording/bot.py will be bugged. You need:

            DailyParams(
                audio_out_enabled=True,
                audio_in_enabled=True,
                video_out_enabled=False,
                vad_analyzer=SileroVADAnalyzer(),
                transcription_enabled=True,
                #
                # Spanish
                #
                # transcription_settings=DailyTranscriptionSettings(
                #     language="es",
                #     tier="nova",
                #     model="2-general"
                # )
            ),

And then this:

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            await audiobuffer.start_recording()
            await transport.capture_participant_transcription(participant["id"])
            await task.queue_frames([context_aggregator.user().get_context_frame()])

Expected behavior

One websocket is created and STT transcribes user audio exactly once.

Actual behavior

Multiple websockets are created so you get duplicated audio.

Logs

2025-05-05 15:56:03.798 | DEBUG    | pipecat.transports.services.daily:_on_transcription_message:1517 - Transcription (from: 67b3f6bd-eaae-4c57-9556-71e0be0680bc): [How are you?]
2025-05-05 15:56:03.801 | DEBUG    | pipecat.transports.services.daily:_on_transcription_message:1517 - Transcription (from: 67b3f6bd-eaae-4c57-9556-71e0be0680bc): [How are you?]

mwood23 avatar May 05 '25 20:05 mwood23

Hmmm, I'm not sure that fix above totally solves it. Here's a patch that fixed it for me:

 src/pipecat/transports/services/daily.py | 43 ++++++++++++++++++++++--
 1 file changed, 40 insertions(+), 3 deletions(-)

diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py
index 3e43ddee..0b3d2493 100644
--- a/src/pipecat/transports/services/daily.py
+++ b/src/pipecat/transports/services/daily.py
@@ -290,6 +290,20 @@ class DailyTransportClient(EventHandler):
         self._video_renderers = {}
         self._transcription_ids = []
         self._transcription_status = None
+        # -------------------------------------------------------------
+        #  DUPLICATE-TRANSCRIPTION BUG NOTE
+        #
+        #  Daily automatically lets you start a room-wide transcription as
+        #  soon as the bot joins.  If you *also* call
+        #  `capture_participant_transcription()` later, the SDK will issue a
+        #  SECOND `start_transcription` RPC to Deepgram → two WebSockets
+        #  listening to the same audio, so every utterance shows up twice.
+        #
+        #  The minimal fix is: only ever send a *single* `start_transcription`
+        #  request, triggered lazily when we first know which participant we
+        #  care about.  We track that with _transcription_started.
+        # -------------------------------------------------------------
+        self._transcription_started: bool = False
 
         self._joining = False
         self._joined = False
@@ -473,7 +487,13 @@ class DailyTransportClient(EventHandler):
 
                 logger.info(f"Joined {self._room_url}")
 
-                if self._params.transcription_enabled:
+                # If we already have target participants (e.g. they were
+                # queued before the join finished) start transcription now.
+                if (
+                    self._params.transcription_enabled
+                    and self._transcription_ids
+                    and not self._transcription_started
+                ):
                     await self._start_transcription()
 
                 await self._callbacks.on_joined(data)
@@ -490,10 +510,17 @@ class DailyTransportClient(EventHandler):
             await self._callbacks.on_error(error_msg)
 
     async def _start_transcription(self):
+        # Start only once.
+        if self._transcription_started:
+            logger.debug("transcription already active – skipping extra start")
+            return
+
         if not self._token:
             logger.warning("Transcription can't be started without a room token")
             return
 
+        self._transcription_started = True
+
         logger.info(f"Enabling transcription with settings {self._params.transcription_settings}")
 
         future = self._get_event_loop().create_future()
@@ -680,8 +707,16 @@ class DailyTransportClient(EventHandler):
         if not self._params.transcription_enabled:
             return
 
-        self._transcription_ids.append(participant_id)
-        if self._joined and self._transcription_status:
+        if participant_id not in self._transcription_ids:
+            self._transcription_ids.append(participant_id)
+
+        if not self._joined:
+            # Room not ready yet; on_joined() will start transcription.
+            return
+
+        if not self._transcription_started:
+            await self._start_transcription()
+        else:
             await self.update_transcription(self._transcription_ids)
 
     async def capture_participant_audio(
@@ -837,6 +872,7 @@ class DailyTransportClient(EventHandler):
     def on_transcription_started(self, status):
         logger.debug(f"Transcription started: {status}")
         self._transcription_status = status
+        self._transcription_started = True
         self._call_async_callback(self.update_transcription, self._transcription_ids)
 
     def on_transcription_stopped(self, stopped_by, stopped_by_error):
@@ -844,6 +880,7 @@ class DailyTransportClient(EventHandler):
 
     def on_transcription_error(self, message):
         logger.error(f"Transcription error: {message}")
+        self._transcription_started = False
 
     def on_transcription_message(self, message):
         self._call_async_callback(self._callbacks.on_transcription_message, message)

mwood23 avatar May 05 '25 22:05 mwood23

FYI: There is a duplicate transcription that resides in Daily's server-side code. We're working on a fix.

markbackman avatar May 05 '25 22:05 markbackman

Do you still see this issue?

markbackman avatar May 13 '25 21:05 markbackman