Skip to content

Voice Server Service

Comprehensive documentation for the VoiceERA Voice Server service.

Overview

The Voice Server is the real-time voice processing engine for VoiceERA, built with Pipecat and Python 3.10+.

Key Responsibilities: - Establish WebSocket connections with clients - Process real-time audio streams - Orchestrate STT, LLM, and TTS services - Manage concurrent voice sessions - Handle audio input/output - Store call recordings and transcripts

Getting Started

Prerequisites

  • Python 3.10+
  • pip
  • Vobiz telephony credentials (for production)
  • API keys for AI services (OpenAI, Deepgram, Cartesia, etc.)

Installation

cd voice_2_voice_server

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

Configuration

# Copy example config
cp .env.example .env

# Edit with your settings
nano .env

Running Locally

# Development mode
python main.py

# With logging
python main.py --log-level DEBUG

# Via Docker
docker build -t voicera-voice-server .
docker run -p 7860:7860 \
  -e OPENAI_API_KEY=sk-... \
  -e DEEPGRAM_API_KEY=... \
  voicera-voice-server

Project Structure

voice_2_voice_server/
├── api/
│   ├── __init__.py
│   ├── server.py              # FastAPI server & WebSocket
│   ├── bot.py                 # Voice bot pipeline
│   └── services.py            # Service factories
├── config/
│   ├── __init__.py
│   ├── llm_mappings.py        # LLM provider configs
│   ├── stt_mappings.py        # STT language mappings
│   ├── tts_mappings.py        # TTS language mappings
│   ├── config.yaml            # Main config (gitignored)
│   └── config.example.yaml    # Example config
├── services/
│   ├── ai4bharat/
│   │   ├── __init__.py
│   │   ├── stt.py             # IndicConformer
│   │   └── tts.py             # IndicParler
│   ├── audio/
│   │   ├── greeting_interruption_filter.py
│   │   └── __init__.py
│   ├── bhashini/
│   │   ├── __init__.py
│   │   ├── stt.py
│   │   └── tts.py
│   ├── kenpath_llm/
│   │   ├── __init__.py
│   │   └── llm.py
│   └── __init__.py
├── serializer/
│   ├── __init__.py
│   └── vobiz_serializer.py    # Protocol serializer
├── storage/
│   ├── __init__.py
│   └── minio_client.py        # MinIO integration
├── agent_configs/             # Agent config files
│   ├── default_agent.json
│   ├── sales_agent.json
│   └── indic_english.json
├── main.py                    # Application entry
├── requirements.txt
├── env.example
├── Dockerfile
└── README.md

Core Components

Voice Bot Pipeline

The heart of the system - orchestrates STT, LLM, and TTS.

class VoiceBot:
    """
    Real-time voice processing pipeline

    Flow: Audio Input → STT → LLM → TTS → Audio Output
    """

    def __init__(self, agent_config):
        self.stt_service = create_stt_service(agent_config)
        self.llm_service = create_llm_service(agent_config)
        self.tts_service = create_tts_service(agent_config)

    async def process_audio(self, audio_chunk):
        # Step 1: Convert audio to text
        transcript = await self.stt_service.transcribe(audio_chunk)

        # Step 2: Get LLM response
        response = await self.llm_service.generate(
            transcript,
            system_prompt=self.agent_config.system_prompt
        )

        # Step 3: Convert response to speech
        audio_response = await self.tts_service.synthesize(response)

        return audio_response

WebSocket Protocol

Message Format:

{
  "type": "audio|control|status",
  "session_id": "uuid",
  "timestamp": 1674003600000,
  "payload": {...}
}

Audio Message:

{
  "type": "audio",
  "session_id": "session-uuid",
  "sequence": 1,
  "format": "pcm_16k",
  "data": "base64-encoded-audio"
}

Control Message:

{
  "type": "control",
  "action": "pause|resume|end",
  "session_id": "session-uuid"
}

Session Management

class SessionManager:
    """Manage active voice sessions"""

    async def create_session(self, auth_token, agent_id):
        session = Session(
            session_id=uuid4(),
            agent_id=agent_id,
            user_id=extract_user_id(auth_token),
            created_at=datetime.now(),
            status="active"
        )
        self.active_sessions[session.session_id] = session
        return session

    async def end_session(self, session_id):
        session = self.active_sessions.get(session_id)
        if session:
            # Save recording
            await self.save_recording(session)
            # Save transcript
            await self.save_transcript(session)
            # Cleanup
            del self.active_sessions[session_id]

Service Providers

LLM Services

OpenAI

class OpenAIService:
    def __init__(self, api_key, model="gpt-4"):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    async def generate(self, prompt, system_prompt=""):
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7,
            max_tokens=200
        )
        return response.choices[0].message.content

Local LLM

class LocalLLMService:
    def __init__(self, api_base, model="mistral-7b"):
        self.api_base = api_base
        self.model = model

    async def generate(self, prompt, system_prompt=""):
        # Call local Ollama or similar
        response = await aiohttp.post(
            f"{self.api_base}/api/generate",
            json={"model": self.model, "prompt": prompt}
        )
        return response.json()

STT Services

Deepgram

class DeepgramSTT:
    def __init__(self, api_key):
        self.client = DeepgramClient(api_key=api_key)

    async def transcribe(self, audio_data):
        # Streaming transcription
        response = await self.client.transcribe_streaming(
            audio_data,
            model="nova-2",
            language="en"
        )
        return response.transcript

AI4Bharat (Indic Languages)

class AI4BharatSTT:
    def __init__(self, service_url):
        self.service_url = service_url

    async def transcribe(self, audio_data, language="hi"):
        # WebSocket to local AI4Bharat server
        response = await aiohttp.post(
            f"{self.service_url}/transcribe",
            json={"audio": base64.b64encode(audio_data).decode(),
                  "language": language}
        )
        return response.json()["transcript"]

TTS Services

Cartesia

class CartesiaTTS:
    def __init__(self, api_key):
        self.client = CartesiaClient(api_key=api_key)

    async def synthesize(self, text, voice="english_male"):
        audio = await self.client.synthesize(
            text=text,
            voice_id=voice,
            sample_rate=16000
        )
        return audio

AI4Bharat

class AI4BharatTTS:
    def __init__(self, service_url):
        self.service_url = service_url

    async def synthesize(self, text, language="hi", voice="female"):
        response = await aiohttp.post(
            f"{self.service_url}/synthesize",
            json={
                "text": text,
                "language": language,
                "voice": voice
            }
        )
        return response.json()["audio"]

WebSocket API

Connection Flow

// Client-side example
const socket = new WebSocket('ws://localhost:7860/voice');

socket.onopen = () => {
  // Send authentication
  socket.send(JSON.stringify({
    type: 'auth',
    token: jwtToken
  }));
};

socket.onmessage = (event) => {
  const message = JSON.parse(event.data);

  if (message.type === 'ready') {
    // Server is ready, start sending audio
    const audioContext = new AudioContext();
    // ... capture and send audio frames
  } else if (message.type === 'audio') {
    // Play response audio
    playAudio(message.data);
  }
};

Server-side Handlers

# FastAPI WebSocket endpoint
@app.websocket("/voice")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    try:
        # Authenticate
        auth_msg = await websocket.receive_json()
        user_id = validate_token(auth_msg['token'])

        # Create session
        session = await session_manager.create_session(
            auth_msg['token'],
            agent_id=auth_msg.get('agent_id')
        )

        # Signal ready
        await websocket.send_json({
            'type': 'ready',
            'session_id': str(session.session_id)
        })

        # Process messages
        while True:
            message = await websocket.receive_json()

            if message['type'] == 'audio':
                audio_chunk = base64.b64decode(message['data'])
                response_audio = await voice_bot.process_audio(
                    audio_chunk
                )
                await websocket.send_json({
                    'type': 'audio',
                    'data': base64.b64encode(response_audio).decode()
                })

            elif message['type'] == 'control':
                if message['action'] == 'end':
                    break

    finally:
        await session_manager.end_session(session.session_id)

Configuration

config.yaml Structure

stt:
  provider: deepgram          # or ai4bharat, google, azure
  deepgram:
    api_key: ${DEEPGRAM_API_KEY}
    model: nova-2
    language: en
  ai4bharat:
    service_url: http://ai4bharat_stt_server:8001
    language: hi

llm:
  provider: openai            # or anthropic, local
  openai:
    api_key: ${OPENAI_API_KEY}
    model: gpt-4
    temperature: 0.7
    max_tokens: 200
  local:
    api_base: http://localhost:8000
    model: mistral-7b

tts:
  provider: cartesia          # or google, ai4bharat
  cartesia:
    api_key: ${CARTESIA_API_KEY}
    voice: english_male
    sample_rate: 16000
  ai4bharat:
    service_url: http://ai4bharat_tts_server:8002
    language: hi
    voice: female

server:
  host: 0.0.0.0
  port: 7860
  max_connections: 100
  session_timeout: 1800       # 30 minutes

backend:
  api_url: http://backend:8000
  upload_recordings: true
  upload_transcripts: true

Monitoring & Health

Health Check Endpoint

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "services": {
            "stt": await stt_service.health_check(),
            "llm": await llm_service.health_check(),
            "tts": await tts_service.health_check(),
            "backend": await backend.health_check()
        },
        "active_sessions": len(session_manager.active_sessions),
        "timestamp": datetime.utcnow().isoformat()
    }

Metrics

Track these key metrics: - Session count - Active concurrent sessions - Audio latency - Time from audio received to response audio sent - Error rate - STT/LLM/TTS failures - Uptime - Service availability - Resource usage - CPU, memory, connections


Error Handling

Retry Logic

@retry_with_backoff(max_retries=3, base_delay=1)
async def call_external_service(service, *args):
    try:
        return await service.call(*args)
    except ServiceError as e:
        if e.is_retryable():
            raise  # Will trigger retry
        else:
            # Don't retry for non-transient errors
            raise NoRetryError(str(e))

Graceful Degradation

async def process_audio(audio_chunk):
    try:
        transcript = await stt_service.transcribe(audio_chunk)
    except STTError:
        # Fallback: ask user to repeat
        return await tts_service.synthesize(
            "Sorry, I didn't catch that. Could you repeat?"
        )

    try:
        response = await llm_service.generate(transcript)
    except LLMError:
        # Fallback response
        return await tts_service.synthesize(
            "I'm having trouble processing your request. Please try again."
        )

    try:
        audio = await tts_service.synthesize(response)
    except TTSError:
        # Log error and disconnect
        logger.error("TTS failed, ending session")
        raise SessionEndError()

    return audio

Performance Optimization

Audio Buffering

class AudioBuffer:
    def __init__(self, sample_rate=16000, duration_ms=100):
        self.sample_rate = sample_rate
        self.buffer_size = (sample_rate * duration_ms) // 1000
        self.buffer = []

    def add(self, chunk):
        self.buffer.extend(chunk)
        if len(self.buffer) >= self.buffer_size:
            return self.get_and_clear()
        return None

Connection Pooling

# Reuse HTTP connections
session = aiohttp.ClientSession(
    connector=aiohttp.TCPConnector(
        limit=100,
        limit_per_host=30,
        ttl_dns_cache=300
    )
)

Next Steps