> ## Documentation Index
> Fetch the complete documentation index at: https://visionagents.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Create Your Own Plugin

Build custom plugins to connect Vision Agents to any AI provider. Plugins wrap provider APIs with a consistent interface, enabling seamless integration with the agent framework.

<Info>
  Vision Agents requires a [Stream](https://getstream.io/try-for-free/) account for real-time transport.
</Info>

## Before You Build

Many providers support OpenAI-compatible APIs. Before writing a custom plugin, check if you can use existing plugins with a custom `base_url`:

```python theme={null}
from vision_agents.plugins import openai

# OpenAI-compatible LLM (Ollama, LM Studio, vLLM, etc.)
llm = openai.ChatCompletionsLLM(
    model="llama-3.1-8b",
    base_url="http://localhost:11434/v1",
    api_key="ollama",  # Some servers require a placeholder
)

# OpenAI-compatible VLM
vlm = openai.ChatCompletionsVLM(
    model="llava-1.5-7b",
    base_url="http://localhost:8000/v1",
    fps=1,
)

# OpenAI-compatible Realtime
realtime = openai.Realtime(
    model="gpt-realtime",
    base_url="wss://custom-endpoint.com/v1/realtime",
)
```

**Build a custom plugin when:**

* The provider uses a proprietary API format
* You need provider-specific features not exposed through Chat Completions
* The provider requires custom authentication or connection handling

## Plugin Categories

| Category           | Base Class                                        | Abstract Methods                                     | Example     |
| ------------------ | ------------------------------------------------- | ---------------------------------------------------- | ----------- |
| **STT**            | `STT`                                             | `process_audio()`                                    | Deepgram    |
| **TTS**            | `TTS`                                             | `stream_audio()`, `stop_audio()`                     | ElevenLabs  |
| **LLM**            | `LLM`                                             | `simple_response()`                                  | OpenRouter  |
| **VLM**            | `VideoLLM`                                        | `watch_video_track()`, `stop_watching_video_track()` | NVIDIA      |
| **Realtime**       | `Realtime`                                        | `connect()`, `simple_audio_response()`               | Gemini Live |
| **Turn Detection** | `TurnDetector`                                    | `process_audio()`                                    | SmartTurn   |
| **Processor**      | `Processor` / `VideoProcessor` / `AudioProcessor` | `close()`                                            | Ultralytics |

<Note>
  The TTS and Realtime base classes also provide a built-in `interrupt()` method and `epoch` property for barge-in handling. You do not need to override these — the Agent calls `interrupt()` automatically when a user interruption is detected.
</Note>

## Quickstart Template

Create your plugin in `plugins/acme/`:

```
plugins/acme/
├── pyproject.toml
├── README.md
├── vision_agents/
│   └── plugins/
│       └── acme/
│           ├── __init__.py
│           ├── stt.py
│           └── events.py  # Optional custom events
└── tests/
    └── test_stt.py
```

### pyproject.toml

```toml theme={null}
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "vision-agents-plugins-acme"
version = "0.1.0"
description = "Acme STT integration for Vision Agents"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "vision-agents",
    "acme-sdk>=1.0",
]

[tool.hatch.build.targets.wheel]
packages = [".", "vision_agents"]

[tool.uv.sources]
vision-agents = { workspace = true }

[dependency-groups]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=1.0",
]
```

### **init**.py

```python theme={null}
from .stt import STT

__all__ = ["STT"]
```

### stt.py

```python theme={null}
from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData
import acme_sdk

class STT(stt.STT):
    """Acme speech-to-text integration."""

    def __init__(self, api_key: str = None, model: str = "default"):
        super().__init__()
        self.client = acme_sdk.Client(api_key=api_key)
        self.model = model

    async def process_audio(self, pcm: PcmData, participant=None):
        resampled = pcm.resample(16000, 1)
        result = await self.client.transcribe(
            audio=resampled.data,
            sample_rate=resampled.sample_rate,
            model=self.model,
        )

        self._emit_transcript_event(
            text=result.text,
            participant=participant,
            response=result,
        )

    async def close(self):
        await self.client.close()
```

## Base Class Interfaces

### STT

```python theme={null}
from vision_agents.core import stt
from getstream.video.rtc.track_util import PcmData

class MySTT(stt.STT):
    async def process_audio(self, pcm: PcmData, participant=None):
        """Process audio and emit transcripts."""
        # Resample to provider's expected format
        resampled = pcm.resample(16000, 1)

        result = await self.provider.transcribe(resampled.data)

        # Emit transcript using helper method
        self._emit_transcript_event(
            text=result.text,
            participant=participant,
            response=result,
        )

    async def close(self):
        """Cleanup connections."""
        await self.provider.close()
```

### TTS

```python theme={null}
from vision_agents.core import tts

class MyTTS(tts.TTS):
    async def stream_audio(self, text: str, **kwargs):
        """Convert text to audio. Return bytes, PcmData, or async iterator."""
        async for chunk in self.provider.synthesize(text):
            yield chunk  # bytes or PcmData

    async def stop_audio(self):
        """Stop current synthesis."""
        await self.provider.cancel()

    async def close(self):
        await self.provider.close()
```

### LLM

```python theme={null}
from vision_agents.core import llm

class MyLLM(llm.LLM):
    async def simple_response(self, text: str, processors=None, participant=None):
        """Generate response to text input."""
        messages = self._build_messages(text)

        # Include tools if function calling is supported
        tools = self._convert_tools_to_provider_format(self.get_available_functions())

        response = await self.provider.chat(messages, tools=tools)

        # Handle tool calls if present
        tool_calls = self._extract_tool_calls_from_response(response)
        if tool_calls:
            results = await self._execute_tools(tool_calls)
            # Continue conversation with tool results...

        return self._create_response_event(response)
```

### VLM (Video Language Model)

VLM plugins process video frames alongside text. The framework provides `VideoForwarder` for frame management.

```python theme={null}
from vision_agents.core.llm import VideoLLM
from vision_agents.core.utils import VideoForwarder
from collections import deque
import base64
from PIL import Image
import io

class MyVLM(VideoLLM):
    def __init__(self, model: str, fps: int = 1, frame_buffer_seconds: int = 10):
        super().__init__()
        self.model = model
        self.fps = fps
        self._frame_buffer = deque(maxlen=fps * frame_buffer_seconds)
        self._forwarder = None
        self._handler_id = None

    async def watch_video_track(self, track, shared_forwarder=None):
        """Subscribe to video frames from the call."""
        # Use shared forwarder if provided, otherwise create own
        if shared_forwarder:
            self._forwarder = shared_forwarder
        else:
            self._forwarder = VideoForwarder(track)
            await self._forwarder.start()

        # Register frame handler at desired FPS
        self._handler_id = self._forwarder.add_frame_handler(
            on_frame=self._on_frame,
            fps=self.fps,
            name="my_vlm",
        )

    def _on_frame(self, frame):
        """Called for each video frame."""
        # Convert to PIL Image
        img = frame.to_image()
        self._frame_buffer.append(img)

    async def stop_watching_video_track(self):
        """Unsubscribe from video frames."""
        if self._forwarder and self._handler_id:
            self._forwarder.remove_frame_handler(self._handler_id)

    async def simple_response(self, text: str, processors=None, participant=None):
        """Generate response using buffered frames."""
        # Encode frames as base64 for API
        images = []
        for img in self._frame_buffer:
            buffer = io.BytesIO()
            img.save(buffer, format="JPEG", quality=85)
            images.append(base64.b64encode(buffer.getvalue()).decode())

        # Call provider with images
        response = await self.provider.chat(
            messages=[{"role": "user", "content": text}],
            images=images,
        )
        return self._create_response_event(response)

    async def close(self):
        await self.stop_watching_video_track()
        if self._forwarder and not self._forwarder._shared:
            await self._forwarder.stop()
```

**Key VLM concepts:**

* **VideoForwarder**: Manages frame buffering and distributes to multiple handlers at different FPS
* **Shared forwarder**: Multiple plugins can share one forwarder to avoid duplicate frame processing
* **Frame buffer**: Store recent frames for context (configurable size)
* **FPS control**: Request frames at the rate your model needs (1-30 fps typical)

### Realtime (Speech-to-Speech)

```python theme={null}
from vision_agents.core.llm import Realtime

class MyRealtime(Realtime):
    async def connect(self):
        """Establish WebSocket/WebRTC connection."""
        self.ws = await self.provider.connect()
        self._emit_connected_event()

    async def simple_audio_response(self, pcm, participant=None):
        """Process incoming audio."""
        await self.ws.send_audio(pcm.data)

    async def close(self):
        await self.ws.close()
        self._emit_disconnected_event()
```

## Audio Utilities

Vision Agents provides utilities to simplify audio handling in STT and TTS plugins.

### PcmData Resampling

Most STT providers expect **16kHz mono** audio. Use the built-in resampling:

```python theme={null}
async def process_audio(self, pcm: PcmData, participant=None):
    # Resample to 16kHz mono (standard for most STT APIs)
    resampled = pcm.resample(target_sample_rate=16000, target_channels=1)

    # Access audio properties
    audio_bytes = resampled.data
    sample_rate = resampled.sample_rate  # 16000
    channels = resampled.channels         # 1
```

### TTS Output Format

The TTS base class handles output format conversion automatically:

```python theme={null}
class MyTTS(tts.TTS):
    def __init__(self):
        super().__init__()
        # Configure output format (called by framework based on call requirements)
        # Default: 16kHz mono PCM_S16

    async def stream_audio(self, text: str, **kwargs):
        # Return audio in any format - base class resamples automatically
        async for chunk in self.provider.synthesize(text):
            yield chunk  # Resampled to _desired_sample_rate before emission
```

### AudioQueue for Buffering

For plugins that need to buffer audio (e.g., accumulating before processing):

```python theme={null}
from vision_agents.core.utils import AudioQueue

queue = AudioQueue(max_duration_ms=5000)  # 5 second buffer

# Add audio
queue.put_nowait(pcm_chunk)

# Get specific duration
audio = queue.get_duration(duration_ms=1000)  # Get 1 second

# Get specific sample count
audio = queue.get_samples(num_samples=16000)  # Get 16k samples
```

## Function Calling

To support function calling in your LLM plugin, override these methods:

```python theme={null}
class MyLLM(llm.LLM):
    def _convert_tools_to_provider_format(self, tools):
        """Convert ToolSchema list to provider's format."""
        return [
            {
                "name": t["name"],
                "description": t.get("description", ""),
                "parameters": t["parameters_schema"],
            }
            for t in tools
        ]

    def _extract_tool_calls_from_response(self, response):
        """Extract tool calls from provider response."""
        return [
            {
                "type": "tool_call",
                "name": call.function_name,
                "arguments_json": call.arguments,
                "id": call.id,
            }
            for call in response.tool_calls
        ]

    def _create_tool_result_message(self, tool_calls, results):
        """Format tool results for follow-up request."""
        return [
            {"role": "tool", "tool_call_id": tc["id"], "content": str(result)}
            for tc, result in zip(tool_calls, results)
        ]
```

The base class handles:

* Function registration via `@llm.register_function()`
* Tool execution with `_execute_tools()` (concurrent, with timeout)
* Tool call deduplication by (name, arguments)
* Multi-round tool calling (configurable via `max_tool_rounds`)

## Event Emission

Base classes provide helper methods for common events:

| Base Class   | Helper Methods                                                                                                           |
| ------------ | ------------------------------------------------------------------------------------------------------------------------ |
| **STT**      | `_emit_transcript_event()`, `_emit_partial_transcript_event()`, `_emit_turn_started_event()`, `_emit_turn_ended_event()` |
| **TTS**      | `_emit_chunk()` (called automatically by `send()`)                                                                       |
| **Realtime** | `_emit_connected_event()`, `_emit_disconnected_event()`, `_emit_audio_output_event()`                                    |

For custom events:

```python theme={null}
from dataclasses import dataclass
from vision_agents.core.events import PluginBaseEvent

@dataclass
class AcmeCustomEvent(PluginBaseEvent):
    type: str = "acme.custom"
    custom_field: str = ""

# In your plugin:
self.events.send(AcmeCustomEvent(custom_field="value"))
```

## Gotchas & Best Practices

### Connection Lifecycle

**Owned vs shared clients**: Track whether your plugin created the client or received it:

```python theme={null}
def __init__(self, client=None):
    self._own_client = client is None
    self._client = client or aiohttp.ClientSession()

async def close(self):
    if self._own_client and self._client:
        await self._client.close()
```

**Connection timeouts**: Always use timeouts for connection setup:

```python theme={null}
self.connection = await asyncio.wait_for(
    self.provider.connect(),
    timeout=10.0
)
```

### Cleanup Order

Follow this order in `close()` to prevent deadlocks:

```python theme={null}
async def close(self):
    # 1. Set closed flag first (prevents new work)
    self.closed = True

    # 2. Cancel background tasks
    if self._task:
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass

    # 3. Close connections in try-finally
    if self.connection:
        try:
            await self.connection.close()
        finally:
            self.connection = None
```

### Error Handling

**Temporary errors** (network timeouts, transient API errors): Emit and continue:

```python theme={null}
except TimeoutError as e:
    self._emit_error_event(e, context="transcription timeout")
    # Plugin continues operating
```

**Permanent errors** (invalid API key, unsupported model): Raise directly:

```python theme={null}
if not api_key:
    raise ValueError("API key required")
```

### Threading for Blocking Operations

Some SDKs have blocking calls. Use a thread pool:

```python theme={null}
from concurrent.futures import ThreadPoolExecutor

class MySTT(stt.STT):
    def __init__(self):
        super().__init__()
        self._executor = ThreadPoolExecutor(max_workers=1)

    async def process_audio(self, pcm, participant=None):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            self._executor,
            self.blocking_transcribe,
            pcm.data
        )
        self._emit_transcript_event(result.text, participant, result)

    async def close(self):
        self._executor.shutdown(wait=False)
```

### Concurrency Control

Prevent concurrent processing when your provider doesn't support it:

```python theme={null}
def __init__(self):
    self._lock = asyncio.Lock()

async def process_audio(self, pcm, participant=None):
    async with self._lock:
        # Only one request at a time
        result = await self.provider.transcribe(pcm.data)
```

### Sample Rate Requirements

| Plugin Type | Expected Input      | Standard Rate                      |
| ----------- | ------------------- | ---------------------------------- |
| STT         | Resampled audio     | **16kHz mono**                     |
| TTS         | Output configurable | 16-48kHz                           |
| Realtime    | Raw PCM             | 24kHz or 48kHz (provider-specific) |

### Reconnection with Backoff

For WebSocket-based plugins:

```python theme={null}
async def _reconnect(self):
    for attempt in range(3):
        try:
            await asyncio.sleep(2 ** attempt)  # 1, 2, 4 seconds
            await self.connect()
            return
        except Exception as e:
            logger.warning(f"Reconnect attempt {attempt + 1} failed: {e}")
    raise ConnectionError("Failed to reconnect after 3 attempts")
```

## Testing

```bash theme={null}
cd plugins/acme
uv sync
uv run pytest -v
```

## Contribution Checklist

1. Implement required abstract methods
2. Add tests with reasonable coverage
3. Pass `uv run pre-commit run --all-files`
4. Add `README.md` documenting usage and events
5. Open a PR to the [Vision Agents repo](https://github.com/GetStream/vision-agents)

## Next Steps

<CardGroup cols={2}>
  <Card title="Event System" icon="bolt" href="/guides/event-system">
    Learn about events
  </Card>

  <Card title="Function Calling" icon="wrench" href="/guides/mcp-tool-calling">
    Add tool support
  </Card>
</CardGroup>
