Skip to main content
Processors extend the agent’s capabilities by analyzing and transforming audio/video streams in real-time. They can access incoming media, provide state to the LLM, and publish transformed streams back to the call. Common use cases:
  • State injection — Feed external data (game scores, stats, API responses) to the LLM
  • Video analysis — Pose detection, object recognition, scene understanding
  • Media transformation — Video effects, avatars, filters

Base Classes

All processors inherit from the abstract Processor base class and implement specific capabilities:
ClassPurpose
ProcessorAbstract base class with name, close(), and attach_agent()
AudioProcessorImplement process_audio() to receive audio streams
VideoProcessorImplement process_video() to receive video tracks
AudioPublisherImplement publish_audio_track() to output audio
VideoPublisherImplement publish_video_track() to output video
AudioProcessorPublisherCombines AudioProcessor + AudioPublisher
VideoProcessorPublisherCombines VideoProcessor + VideoPublisher

Abstract Methods

Base Processor

All processors must implement:
from vision_agents.core.processors import Processor

class MyProcessor(Processor):
    @property
    def name(self) -> str:
        """Processor name identifier."""
        return "my_processor"

    async def close(self) -> None:
        """Clean up resources when the application exits."""
        pass

    def attach_agent(self, agent: "Agent") -> None:
        """Optional: Perform actions with the Agent (e.g., register custom events)."""
        pass

Audio Processing

from getstream.video.rtc import PcmData
from vision_agents.core.processors import AudioProcessor

class MyAudioProcessor(AudioProcessor):
    @property
    def name(self) -> str:
        return "my_audio_processor"

    async def process_audio(self, audio_data: PcmData) -> None:
        """Process incoming audio. Participant info is in audio_data.participant."""
        pass

    async def close(self) -> None:
        pass

Video Processing

import aiortc
from typing import Optional
from vision_agents.core.processors import VideoProcessor
from vision_agents.core.utils.video_forwarder import VideoForwarder

class MyVideoProcessor(VideoProcessor):
    @property
    def name(self) -> str:
        return "my_video_processor"

    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """Process incoming video track."""
        pass

    async def close(self) -> None:
        pass

Publishing Tracks

import aiortc
from vision_agents.core.processors import VideoPublisher, AudioPublisher

class MyVideoPublisher(VideoPublisher):
    @property
    def name(self) -> str:
        return "my_video_publisher"

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return a video track to publish transformed video."""
        return aiortc.VideoStreamTrack()

    async def close(self) -> None:
        pass

class MyAudioPublisher(AudioPublisher):
    @property
    def name(self) -> str:
        return "my_audio_publisher"

    def publish_audio_track(self) -> aiortc.AudioStreamTrack:
        """Return an audio track to publish transformed audio."""
        return aiortc.AudioStreamTrack()

    async def close(self) -> None:
        pass

Combined Processor + Publisher

For processors that both consume and produce media (e.g., video processors that annotate frames):
from vision_agents.core.processors import VideoProcessorPublisher

class MyAnnotationProcessor(VideoProcessorPublisher):
    @property
    def name(self) -> str:
        return "annotation_processor"

    async def process_video(self, track, participant_id, shared_forwarder=None) -> None:
        """Receive and process incoming video."""
        pass

    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return annotated video track."""
        return self._video_track

    async def close(self) -> None:
        pass

Usage

from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, openai

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="AI Assistant", id="agent"),
    llm=openai.Realtime(),
    processors=[your_processor]
)
For implementation examples including YOLO pose detection and object detection, see the Video Processors Guide.