Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,249 @@ user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
)
```

## Building Custom Strategies
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to a guide instead of nesting it the API reference?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good fit for pipecat/fundamentals/user-input-muting.mdx.


Subclass `BaseUserMuteStrategy` when none of the built-in strategies fit. A strategy only needs to answer one question per frame: should the user be muted right now?

### The base class

`BaseUserMuteStrategy` (in `pipecat.turns.user_mute`) exposes the following interface. The signatures below are a simplified view of what your subclass can override:

```python
# Simplified interface; see the source for the full definition.
class BaseUserMuteStrategy:
async def setup(self, task_manager): ...
async def cleanup(self): ...
async def reset(self): ...

async def process_frame(self, frame: Frame) -> bool:
"""Return True if the user should be muted after this frame."""
return False
```

Override `process_frame` to update internal state and return the current mute decision. Override `reset` if your strategy tracks turn-based state that should clear between conversations.

### Which frames reach a strategy

Each strategy's `process_frame` is called for every frame that passes through the user aggregator, **except** `StartFrame`, `EndFrame`, and `CancelFrame`. This includes:

- User-direction frames from the input transport and STT: `TranscriptionFrame`, `InterimTranscriptionFrame`, `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`, `VADUserStartedSpeakingFrame`, `VADUserStoppedSpeakingFrame`, `InputAudioRawFrame`, `InterruptionFrame`
- `SystemFrame` broadcasts from elsewhere in the pipeline: `BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `FunctionCallsStartedFrame`, `FunctionCallResultFrame`, `FunctionCallCancelFrame`

<Note>
Frames that don't naturally reach the user aggregator (for example
`LLMTextFrame` or `TTSTextFrame`, which flow downstream from the LLM or TTS)
won't be seen by a strategy directly. To react to those signals, place a
companion `FrameProcessor` where the frames do flow and have it toggle state
on your strategy. See [Toggling a strategy at
runtime](#toggling-a-strategy-at-runtime) below.
</Note>

### Which frames get suppressed when muted

Returning `True` from your strategy sets the aggregator's mute state. While muted, only these frame types are actually dropped:

- `InterruptionFrame`
- `VADUserStartedSpeakingFrame`, `VADUserStoppedSpeakingFrame`
- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`
- `InputAudioRawFrame`
- `InterimTranscriptionFrame`, `TranscriptionFrame`

All other frames continue to flow so the rest of the pipeline keeps functioning.

### Example: a simple custom strategy

Mute the user whenever the bot is speaking, but only after a specific number of bot turns:

```python
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.user_mute import BaseUserMuteStrategy


class AfterNTurnsUserMuteStrategy(BaseUserMuteStrategy):
def __init__(self, mute_after_turn: int = 3):
super().__init__()
self._mute_after_turn = mute_after_turn
self._bot_turns = 0
self._bot_speaking = False

async def reset(self):
self._bot_turns = 0
self._bot_speaking = False

async def process_frame(self, frame: Frame) -> bool:
await super().process_frame(frame)

if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
self._bot_turns += 1

return self._bot_speaking and self._bot_turns >= self._mute_after_turn
```

### Toggling a strategy at runtime

Strategies are plain Python objects. Anything that holds a reference to one can flip its state between frames, which means a companion processor placed elsewhere in the pipeline can drive the mute decision based on signals the strategy can't observe directly (LLM text, tool results, external events).

This example strategy adds its own `enable`/`disable` methods (not part of the base contract) and returns their state from `process_frame`:

```python
from pipecat.frames.frames import Frame
from pipecat.turns.user_mute import BaseUserMuteStrategy


class ToggleableUserMuteStrategy(BaseUserMuteStrategy):
def __init__(self):
super().__init__()
self._muted = False

def enable(self):
self._muted = True

def disable(self):
self._muted = False

async def reset(self):
self._muted = False

async def process_frame(self, frame: Frame) -> bool:
await super().process_frame(frame)
return self._muted
```
Comment thread
jamsea marked this conversation as resolved.

A companion processor watches for the trigger and toggles the strategy:

```python
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class DisclaimerGuardProcessor(FrameProcessor):
def __init__(self, strategy: ToggleableUserMuteStrategy, trigger_phrase: str, **kwargs):
super().__init__(**kwargs)
self._strategy = strategy
self._trigger = trigger_phrase
# Keep a small sliding window so cross-frame matches work without
# the buffer growing unbounded if the trigger never appears.
self._max_buffer = max(len(trigger_phrase) * 4, 512)
self._buffer = ""
self._active = False

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, BotStartedSpeakingFrame):
# Start each bot turn with a fresh buffer.
self._buffer = ""
elif isinstance(frame, LLMTextFrame) and direction == FrameDirection.DOWNSTREAM:
self._buffer = (self._buffer + frame.text)[-self._max_buffer :]
if not self._active and self._trigger in self._buffer:
self._active = True
self._strategy.enable()
elif isinstance(frame, BotStoppedSpeakingFrame) and self._active:
self._active = False
self._buffer = ""
self._strategy.disable()

await self.push_frame(frame, direction)
```

Wire them together by passing the same strategy instance to both the aggregator and the processor:

```python
mute_strategy = ToggleableUserMuteStrategy()

user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(user_mute_strategies=[mute_strategy]),
)

disclaimer_guard = DisclaimerGuardProcessor(
strategy=mute_strategy,
trigger_phrase="Please read the following disclosure",
)

pipeline = Pipeline([
transport.input(),
stt,
user_aggregator,
llm,
disclaimer_guard, # positioned where LLMTextFrame flows downstream
tts,
transport.output(),
assistant_aggregator,
])
```

### Example: mute for the first N words of the bot speaking

Count words as the LLM streams text and keep the user muted until the threshold is reached. The strategy owns the counter and resets it each turn; a companion processor feeds it text:

```python
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.user_mute import BaseUserMuteStrategy


class FirstNWordsUserMuteStrategy(BaseUserMuteStrategy):
def __init__(self, word_count: int = 10):
super().__init__()
self._threshold = word_count
self._words_seen = 0
self._bot_speaking = False

def add_words(self, text: str):
self._words_seen += len(text.split())

async def reset(self):
self._words_seen = 0
self._bot_speaking = False

async def process_frame(self, frame: Frame) -> bool:
await super().process_frame(frame)

if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
self._words_seen = 0
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False

return self._bot_speaking and self._words_seen < self._threshold
```

```python
from pipecat.frames.frames import Frame, LLMTextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor


class LLMTextWordCounter(FrameProcessor):
def __init__(self, strategy: FirstNWordsUserMuteStrategy, **kwargs):
super().__init__(**kwargs)
self._strategy = strategy

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, LLMTextFrame) and direction == FrameDirection.DOWNSTREAM:
self._strategy.add_words(frame.text)

await self.push_frame(frame, direction)
```

<Tip>
Because mute decisions are only re-evaluated when frames pass through the
aggregator, the unmute point here aligns with the next user or bot frame after
the threshold is crossed, not the exact word boundary. For tighter control,
drop the word count and gate on a sentinel phrase the LLM emits at the end of
the protected section, as shown in the disclaimer example above.
</Tip>

## Event Handlers

You can register event handlers to be notified when user muting starts or stops. This is useful for observability or providing feedback to users.
Expand Down
Loading