Skip to main content
Processors let you analyze, transform, or publish audio and video streams in real-time. This guide shows how to build custom processors and use built-in ones like YOLO pose detection and HeyGen avatars.
For the base class API reference, see Processors Class.

Building a Custom Processor

Here’s a minimal video processor that logs frames:
import aiortc
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class FrameLogger(VideoProcessor):
    """Logs video frames as they arrive."""

    @property
    def name(self) -> str:
        return "frame_logger"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self.frame_count = 0
        self._video_forwarder = None

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """Subscribe to video frames."""
        self._video_forwarder = shared_forwarder
        self._video_forwarder.add_frame_handler(
            self._log_frame,
            fps=float(self.fps),
            name="frame_logger"
        )

    async def _log_frame(self, frame):
        self.frame_count += 1
        print(f"📊 Frame {self.frame_count} ({frame.width}x{frame.height})")

    async def close(self) -> None:
        pass
Key patterns:
  • Inherit from VideoProcessor (or AudioProcessor for audio)
  • Implement the name property and close() method
  • Use VideoForwarder.add_frame_handler() to subscribe to frames

Publishing Transformed Video

To publish transformed video back to the call, use VideoProcessorPublisher with the built-in QueuedVideoTrack and VideoForwarder utilities:
import aiortc
import av
from typing import Optional
from vision_agents.core.processors import VideoProcessorPublisher
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.utils.video_forwarder import VideoForwarder

class GrayscaleProcessor(VideoProcessorPublisher):
    """Converts video to grayscale and publishes it back to the call."""

    @property
    def name(self) -> str:
        return "grayscale"

    def __init__(self, fps: int = 30):
        self.fps = fps
        self._video_track = QueuedVideoTrack()
        self._video_forwarder = None

    async def process_video(
        self,
        incoming_track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """Subscribe to video frames and process them."""
        self._video_forwarder = shared_forwarder
        self._video_forwarder.add_frame_handler(
            self._process_frame, 
            fps=float(self.fps), 
            name="grayscale"
        )

    async def _process_frame(self, frame: av.VideoFrame):
        """Convert frame to grayscale and publish."""
        import cv2
        
        # Convert to numpy, apply effect
        img = frame.to_ndarray(format="rgb24")
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)
        
        # Publish transformed frame
        new_frame = av.VideoFrame.from_ndarray(rgb, format="rgb24")
        await self._video_track.add_frame(new_frame)

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return the track that publishes transformed video."""
        return self._video_track

    async def close(self) -> None:
        pass
Key utilities:
  • QueuedVideoTrack — Base track class that handles frame queuing. Call add_frame() to publish.
  • VideoForwarder — Distributes incoming frames to handlers. Use add_frame_handler() to subscribe.
This pattern is used by Moondream detection and YOLO pose processors.

YOLO Pose Detection

The Ultralytics plugin provides YOLOPoseProcessor for real-time pose detection. It annotates video frames with skeleton overlays that the LLM can analyze for feedback.
from vision_agents.plugins import getstream, gemini, ultralytics

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Golf Coach", id="agent"),
    instructions="Analyze the user's golf swing.",
    llm=gemini.Realtime(fps=10),
    processors=[
        ultralytics.YOLOPoseProcessor(model_path="yolo11n-pose.pt")
    ],
)
Use cases: Golf coaching, fitness form checking, dance instruction, physical therapy. See Ultralytics YOLO for all parameters and model options.

HeyGen Avatar Publisher

The HeyGen plugin provides AvatarPublisher to display a lip-syncing AI avatar that speaks the agent’s responses. The avatar automatically syncs lip movements with your agent’s voice output. Use cases: Virtual presenters, customer service avatars, interactive tutors. See HeyGen Avatars for setup, parameters, and examples.