System Design¶
Detailed technical design and component documentation.
Component Deep Dives¶
1. Frontend Service¶
Stack: Next.js 16+, React 18+, TailwindCSS 4+
Port: 3000
Key Features: - Server-side rendering (SSR) for performance - Client-side state management - Real-time WebSocket connections - Responsive design for mobile/desktop
Key Modules:
app/
├── (auth) # Authentication pages
│ ├── login.tsx
│ └── signup.tsx
├── (dashboard) # Main application
│ ├── agents/
│ ├── campaigns/
│ ├── call-logs/
│ └── analytics/
├── api/ # API routes
└── components/ # Shared components
Dependencies:
- axios or fetch - HTTP client
- socket.io-client - WebSocket client
- zustand or redux - State management
- next-auth - Authentication
2. Backend API Service¶
Stack: FastAPI, Python 3.10+, SQLAlchemy/Motor
Port: 8000
Key Endpoints:
- /auth/* - Authentication
- /agents/* - Agent CRUD operations
- /campaigns/* - Campaign management
- /call-logs/* - Call history
- /transcripts/* - Transcription data
- /health - Service health
Database Schema:
Users
├── id (UUID)
├── email
├── password (hashed)
├── role (admin/user)
└── created_at
Agents
├── id (UUID)
├── name
├── llm_provider
├── stt_provider
├── tts_provider
├── system_prompt
├── user_id (FK)
└── created_at
Campaigns
├── id (UUID)
├── name
├── agent_id (FK)
├── phone_numbers
├── status
└── created_at
CallLogs
├── id (UUID)
├── campaign_id (FK)
├── phone_number
├── duration
├── transcript
├── status
└── created_at
Key Services:
# app/services/
├── agent_service.py # Agent logic
├── campaign_service.py # Campaign logic
├── call_recording_service.py
├── auth_service.py # JWT tokens
└── analytics_service.py # Call analytics
3. Voice Server¶
Stack: Pipecat, Python 3.10+
Port: 7860
Responsibilities: - Accept WebSocket connections from frontend - Receive raw audio frames - Process audio through STT pipeline - Generate LLM responses - Convert responses to speech via TTS - Stream audio back to user
Pipeline Architecture:
Audio Input
│
▼
[Resampler]
│
▼
[STT Service]
│
▼ (Transcript)
[LLM Service]
│
▼ (Response)
[TTS Service]
│
▼
[Audio Output]
Configuration:
# config/config.example.yaml
stt:
provider: deepgram # or ai4bharat, google, etc.
language: en
llm:
provider: openai
model: gpt-4
temperature: 0.7
tts:
provider: cartesia
voice: english_male
speed: 1.0
4. MongoDB¶
Purpose: Primary data store
Collections:
- users - User accounts
- agents - Agent configurations
- campaigns - Campaign definitions
- call_logs - Call history and metadata
- transcripts - Call transcriptions
- analytics - Aggregated metrics
Indices:
// Performance-critical indices
db.users.createIndex({ email: 1 }, { unique: true })
db.agents.createIndex({ user_id: 1 })
db.campaigns.createIndex({ agent_id: 1 })
db.call_logs.createIndex({ campaign_id: 1, created_at: -1 })
db.call_logs.createIndex({ phone_number: 1 })
5. MinIO (Object Storage)¶
Purpose: Store binary files - recordings, transcripts, etc.
Bucket Structure:
minio/
├── recordings/
│ └── {campaign_id}/{call_id}.wav
├── transcripts/
│ └── {campaign_id}/{call_id}.json
└── agent-configs/
└── {agent_id}/config.json
Access: S3-compatible API (AWS SDK compatible)
6. External AI Services¶
LLM Services¶
- OpenAI: GPT-4, GPT-3.5-turbo
- Anthropic: Claude
- Local: LLaMA, Mistral (self-hosted)
STT Services¶
- Deepgram: High-accuracy transcription
- Google Cloud: Speech-to-Text
- AI4Bharat: Indic language support
TTS Services¶
- Cartesia: High-quality voice synthesis
- Google Cloud: Text-to-Speech
- AI4Bharat: Indic language synthesis
Request-Response Flows¶
Authentication Flow¶
1. Frontend sends login credentials
POST /auth/login { email, password }
2. Backend validates credentials
- Hash password
- Compare with stored hash
3. Backend generates JWT token
token = sign({ user_id, exp, permissions })
4. Frontend stores token
localStorage.setItem('token', jwt)
5. Subsequent requests include token
headers.Authorization = "Bearer " + jwt
Voice Call Flow¶
1. User initiates call in frontend
websocket.connect('ws://voice-server:7860')
2. Frontend sends auth token
message: { type: 'auth', token: jwt }
3. Voice Server validates token
- Decodes JWT
- Checks permissions
4. Voice Server sends ready signal
message: { type: 'ready', session_id: uuid }
5. Frontend streams audio chunks
message: { type: 'audio', data: ArrayBuffer }
6. Voice Server processes
- STT: Audio → Text
- LLM: Text → Response
- TTS: Response → Audio
7. Voice Server sends response audio
message: { type: 'audio', data: ArrayBuffer }
8. Frontend plays audio
audioContext.playback(data)
Recording & Storage¶
1. During call, Voice Server buffers audio
2. After call ends
- Save raw audio to MinIO
- Save transcript to MongoDB & MinIO
- Log call metadata to MongoDB
3. Frontend fetches call data
GET /call-logs/{call_id}
- Returns metadata from MongoDB
- Returns pre-signed URL to audio in MinIO
Error Handling & Resilience¶
Circuit Breaker Pattern¶
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.threshold = failure_threshold
self.timeout = timeout
def call(self, func, *args):
if self.is_open():
raise CircuitBreakerOpen()
try:
result = func(*args)
self.reset()
return result
except Exception as e:
self.record_failure()
raise
Retry Logic¶
@retry(max_attempts=3, backoff=exponential)
def call_external_api(endpoint):
# Automatically retry on failure
# with exponential backoff
pass
Health Checks¶
Backend:
GET /health
Response: { status: "ok", timestamp: "..." }
Voice Server:
- STT service connectivity
- LLM service connectivity
- TTS service connectivity
- Memory usage
- Active connections
Performance Considerations¶
Caching Strategy¶
# Cache agent configurations (10 min TTL)
@cache(ttl=600)
def get_agent(agent_id):
return db.agents.find_one({"_id": ObjectId(agent_id)})
# Cache call logs (1 hour TTL)
@cache(ttl=3600)
def get_call_logs(campaign_id, limit=100):
return db.call_logs.find({...})
Database Optimization¶
- Connection pooling
- Query indexing
- Pagination for list endpoints
- Aggregation pipelines for analytics
Audio Processing¶
- Streaming rather than buffering entire files
- Resampling to match provider requirements
- Compression for storage
Monitoring & Observability¶
Logging¶
All services log to stdout in JSON format:
{
"timestamp": "2024-01-29T10:30:45Z",
"level": "INFO",
"service": "backend",
"message": "Call initiated",
"call_id": "abc123",
"agent_id": "xyz789"
}
Metrics¶
Key metrics to monitor:
- API Response Time: Mean, P95, P99
- Call Duration: Average, distribution
- STT Accuracy: Word error rate
- LLM Response Time: Latency
- System Resources: CPU, Memory, Disk
- Error Rate: By service, by endpoint
Tracing¶
Optional: Implement distributed tracing with Jaeger or Zipkin to track request flows across services.
Security Considerations¶
Data Validation¶
from pydantic import BaseModel, validator
class CreateAgentRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
llm_provider: str = Field(..., pattern="^[a-z0-9_]+$")
@validator('name')
def name_must_be_valid(cls, v):
if not v.strip():
raise ValueError('Name cannot be empty')
return v.strip()
SQL Injection Prevention¶
- Use parameterized queries (SQLAlchemy ORM)
- Never concatenate user input into queries
CORS Configuration¶
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
Rate Limiting¶
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/auth/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
# Max 5 login attempts per minute
pass
Next Steps¶
- Data Flow - Trace how data moves through the system
- Docker Deployment - Containerized deployment
- API Documentation - Detailed API endpoints