> ## Documentation Index
> Fetch the complete documentation index at: https://visionagents.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Building Video Processors

Processors analyze, transform, or publish audio and video streams in real-time. Build custom processors or use built-in ones like YOLO pose detection and HeyGen avatars.

<Note>
  For the base class API reference, see [Processors Class](/core/processors-core).
</Note>

## How Video Processing Works

When a participant publishes video, the agent creates a `VideoForwarder` that reads frames from the track. This forwarder is shared with all video processors via the `process_video()` method. Each processor registers a frame handler with its desired FPS.

```
Participant Video Track
         │
         ▼
   VideoForwarder (shared)
         │
    ┌────┼────┐
    ▼    ▼    ▼
 Proc1 Proc2 Proc3  (each at independent FPS)
```

## Building a Custom Processor

A minimal video processor that logs frames:

```python theme={null}
import aiortc
import av
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class FrameLogger(VideoProcessor):
    name = "frame_logger"

    def __init__(self, fps: int = 5):
        self.fps = fps
        self.frame_count = 0
        self._forwarder: Optional[VideoForwarder] = None

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._log_frame,
            fps=float(self.fps),
            name="frame_logger",
        )

    async def _log_frame(self, frame: av.VideoFrame):
        self.frame_count += 1
        print(f"Frame {self.frame_count} ({frame.width}x{frame.height})")

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._log_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
```

**Key patterns:**

* Set `name` as a class attribute
* Store a reference to `shared_forwarder` to remove handlers later
* Register handlers with `add_frame_handler(callback, fps, name)`
* Clean up in `stop_processing()` (called when tracks are removed) and `close()`

## Publishing Transformed Video

Use `VideoProcessorPublisher` with `QueuedVideoTrack` to publish transformed video back to the call:

```python theme={null}
import aiortc
import av
import cv2
from typing import Optional
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.utils.video_forwarder import VideoForwarder

class GrayscaleProcessor(VideoProcessorPublisher):
    name = "grayscale"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self._forwarder: Optional[VideoForwarder] = None
        self._video_track = QueuedVideoTrack()

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        # Remove existing handler if switching tracks
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)

        self._forwarder = shared_forwarder
        self._forwarder.add_frame_handler(
            self._process_frame,
            fps=float(self.fps),
            name="grayscale",
        )

    async def _process_frame(self, frame: av.VideoFrame):
        img = frame.to_ndarray(format="rgb24")
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)

        new_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24")
        await self._video_track.add_frame(new_frame)

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        return self._video_track

    async def stop_processing(self) -> None:
        if self._forwarder:
            await self._forwarder.remove_frame_handler(self._process_frame)
            self._forwarder = None

    async def close(self) -> None:
        await self.stop_processing()
        self._video_track.stop()
```

**Key utilities:**

* `QueuedVideoTrack` — Writable video track. Call `add_frame()` to queue frames for publishing.
* `VideoForwarder` — Distributes incoming frames to handlers at independent FPS rates.

## Emitting Custom Events

Use `attach_agent()` to register custom events that other parts of your application can subscribe to:

```python theme={null}
from dataclasses import dataclass
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.events import Event

@dataclass
class ObjectDetectedEvent(Event):
    objects: list[str]
    frame_number: int

class DetectionProcessor(VideoProcessorPublisher):
    name = "detection"

    def attach_agent(self, agent):
        self._events = agent.events
        self._events.register(ObjectDetectedEvent)

    async def _process_frame(self, frame):
        objects = self._detect(frame)
        await self._events.emit(ObjectDetectedEvent(
            objects=objects,
            frame_number=self.frame_count,
        ))
```

## YOLO Pose Detection

The Ultralytics plugin provides `YOLOPoseProcessor` for real-time pose detection with skeleton overlays:

```python theme={null}
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini, ultralytics

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Golf Coach", id="agent"),
    instructions="Analyze the user's golf swing.",
    llm=gemini.Realtime(fps=10),
    processors=[
        ultralytics.YOLOPoseProcessor(
            model_path="yolo11n-pose.pt",
            fps=30,
            conf_threshold=0.5,
        )
    ],
)
```

**Use cases:** Golf coaching, fitness form checking, dance instruction, physical therapy.

See [Ultralytics](/integrations/vision/ultralytics) for parameters and model options.

## HeyGen Avatars

The HeyGen plugin provides `AvatarPublisher` for lip-syncing AI avatars that speak agent responses.

**Use cases:** Virtual presenters, customer service avatars, interactive tutors.

See [HeyGen](/integrations/avatars/heygen) for setup and examples.

## Next Steps

<CardGroup cols={2}>
  <Card title="Processors Class" icon="layer-group" href="/core/processors-core">
    Base class API reference
  </Card>

  <Card title="Ultralytics YOLO" icon="person-running" href="/integrations/vision/ultralytics">
    Pose detection reference
  </Card>

  <Card title="HeyGen Avatars" icon="user" href="/integrations/avatars/heygen">
    Avatar publisher reference
  </Card>

  <Card title="Event System" icon="bell" href="/guides/event-system">
    Custom events reference
  </Card>
</CardGroup>
