Sambanova Voice AI Productivity System
LangGraph + Twilio + MCP + Team Management + Call Transfer + Sentry Monitoring + Redis + Composio + Audio Stream Player + WebRTC Voice Integration
Technical Architecture
System Architecture Overview
Sambanova Team Collaboration Architecture
Frontend Layer
Core Processing Layer
External Services Layer
Architecture Flow: Enterprise voice AI system with PIN authentication, team collaboration, intelligent call transfer to FusionPBX call center, WebRTC voice integration with Deepgram STT, and comprehensive Sentry error monitoring with automatic thread reset recovery (8s/10s/12s timeout optimization).
Overview
The Sambanova Voice AI Productivity System is an enterprise-grade platform that combines
LangGraph AI agents, team collaboration, voice interaction, and intelligent call center integration.
Built for hackathon demonstrations, it showcases advanced features including PIN authentication,
role-based access control, call transfer to FreePBX, and production-grade error monitoring with Sentry.
The system enables teams to manage todos, assign tasks to members, and collaborate in real-time
through web dashboards, voice commands, and seamless transfer to human agents. Features optimized
timeout handling (8s/10s/12s), automatic thread reset recovery, and comprehensive error tracking.
All data is synchronized with Google Calendar using OAuth2 authentication, with intelligent caching
to ensure sub-second voice responses.
Core Technologies
- • LangGraph for agent orchestration with tools_condition
- • Model Context Protocol (MCP) - 38 tools (36 DB + 2 transfer)
- • LangChain for LLM integration & tool binding
- • Flask + Flask-SocketIO for web API & WebSocket
- • SQLAlchemy ORM with PostgreSQL (multi-tenant)
- • PIN authentication (4-6 digit voice PIN)
- • OpenAI APIs (GPT-4 Turbo, Whisper STT)
- • Twilio Programmable Voice with Media Streams
- • Google Calendar OAuth2 integration (optimized)
- • FusionPBX SIP integration for call transfer
- • Deepgram Speech-to-Text API for WebRTC
- • WebRTC browser-based voice recording
- • Sentry.io error monitoring & performance tracking
- • JsSIP WebRTC for browser-based softphone
- • Render.com auto-deployment with gunicorn+eventlet
Key Features
- • Multi-tenant team collaboration with role hierarchy
- • PIN-based voice authentication (4-6 digits)
- • Intelligent call transfer: AI → FusionPBX Extension 2001
- • WebRTC voice with Deepgram STT transcription
- • Redis audio buffer management for WebRTC
- • Sentry error monitoring & performance tracking
- • Optimized timeouts (8s/10s/12s) for Twilio compatibility
- • Automatic thread reset on timeout/error
- • 38 MCP tools (todos, calendar, teams, transfer)
- • WebRTC call center with JsSIP softphone
- • Real-time voice AI with barge-in capability
- • Google Calendar OAuth2 sync (background)
- • FusionPBX integration on Google Cloud VM
- • Production deployment on Render with auto-scaling
Recent Updates & Improvements (October 2025)
Call Transfer to FusionPBX
- ✓ Seamless AI → Human agent transfer
- ✓ FusionPBX extension 2001 integration
- ✓ SIP/WSS connectivity (Google Cloud VM)
- ✓ Transfer detection via phrases or tool
- ✓ Department routing (support, sales, etc.)
Deepgram WebRTC STT
- ✓ Real-time speech-to-text transcription
- ✓ WebRTC audio stream processing
- ✓ WebM format detection & handling
- ✓ High accuracy transcription (95%+)
- ✓ Low latency (200-500ms)
Composio Integration
- ✓ Slack workspace integration
- ✓ GitHub repository management
- ✓ Gmail email automation
- ✓ Notion workspace sync
- ✓ External platform connectivity
Sentry Integration
- ✓ Real-time error tracking & alerts
- ✓ Performance monitoring (agent processing time)
- ✓ User context & session tracking
- ✓ Timeout & thread reset tracking
- ✓ Production-grade observability
Timeout Optimization
- ✓ Tool timeout: 8s (from 20s)
- ✓ Agent timeout: 10s (from 25s)
- ✓ Webhook timeout: 12s (from 30s)
- ✓ Stays under Twilio's 15s HTTP limit
- ✓ Thread reset on timeout prevents errors
WebRTC Call Center
- ✓ JsSIP v3.10.1 browser softphone
- ✓ WebSocket Secure (WSS) on port 7443
- ✓ Agent dashboard with SIP registration
- ✓ Call control (answer, hold, transfer, hangup)
- ✓ Google Cloud firewall configured
Automatic Error Recovery
- ✓ Thread reset with timestamped IDs
- ✓ BrokenResourceError handling
- ✓ tool_call_id incomplete error recovery
- ✓ In-memory reset tracking (_reset_threads)
- ✓ No cascading failures
Performance Optimization
- ✓ Removed Google Calendar sync delay
- ✓ Simplified JSON responses (no MCP breaks)
- ✓ Agent processing time measurement
- ✓ Transaction tracking per voice call
- ✓ Custom Sentry metrics & measurements
WebRTC Voice Integration Architecture
WebRTC Voice Assistant Architecture
WebRTC Speech-to-Deepgram Processing Flow
Flow: Your speech → WebRTC capture → Socket.IO streaming → Redis buffer → Deepgram transcription → LangGraph processing → Tool execution → Response
WebRTC Architecture: Browser-based voice assistant with Redis session management, real-time audio streaming via Socket.IO, Deepgram STT transcription, and Composio tool integration for external platform connectivity.
WebRTC Voice Interface
- ✓ Browser-based voice recording
- ✓ Real-time audio streaming
- ✓ WebSocket communication
- ✓ Audio format detection (WebM)
- ✓ Base64 audio encoding
Redis Session Management
- ✓ Session storage & caching
- ✓ Audio buffer management
- ✓ Real-time notifications
- ✓ User activity tracking
- ✓ Session expiration handling
Composio Integration
- ✓ Slack workspace integration
- ✓ GitHub repository management
- ✓ Gmail email automation
- ✓ Notion workspace sync
- ✓ Jira project management
Latest Features (October 2025)
Redis Session Management
- ✓ Session storage & caching
- ✓ Audio buffer management
- ✓ Real-time notifications (Pub/Sub)
- ✓ Rate limiting & analytics
- ✓ User activity tracking
Composio Tool Integration
- ✓ Slack workspace integration
- ✓ GitHub repository management
- ✓ Gmail email automation
- ✓ Notion workspace sync
- ✓ Jira project management
Audio Stream Player
- ✓ Redis audio buffer playback
- ✓ WebM format support
- ✓ Real-time audio streaming
- ✓ Session audio download
- ✓ Audio format detection
Module Structure
Sambanova Project Structure
Project Root/
├── app.py # Main Flask application
├── start_simple.py # Production server startup
├── build.sh # Render.com build script
├── deploy_setup.py # Deployment migrations
├── requirements.txt # Python dependencies
├── recordings/ # Call recordings (git-ignored)
├── templates/ # Flask templates
│ ├── team_dashboard.html # Team management UI
│ ├── register.html # User registration
│ └── sambanova_tech_spec.html # Technical documentation
└── sambanova/ # Sambanova module
├── __init__.py # Package initialization
├── routes.py # Flask routes & Twilio webhooks (Sentry integrated)
├── assistant_graph_todo.py # LangGraph agent (optimized timeouts)
├── state.py # Agent state management
├── models/ # Database models
│ ├── base.py # Shared SQLAlchemy Base
│ └── user_models.py # User, Team, TeamMembership models
├── security/ # Authentication & authorization
│ └── auth.py # JWT authentication system
├── api_routes/ # RESTful API endpoints
│ ├── auth_routes.py # User registration & login
│ ├── team_routes.py # Team management API
│ └── team_todo_routes.py # Team todo management API
├── migrations/ # Database migrations
│ └── add_team_collaboration.py
├── mcps/ # Model Context Protocol servers
│ ├── mcp_config.json # MCP server configuration
│ └── local_servers/
│ ├── db_todo.py # 36 database tools (optimized, no Google sync)
│ ├── call_transfer.py # 2 call transfer tools (FusionPBX integration)
│ ├── deepgram_service.py # Deepgram STT service
│ └── deepgram_webrtc_integration.py # WebRTC Deepgram integration
│ └── google_calendar.py # Calendar operations
└── Documentation/ # Comprehensive guides
├── CALL_TRANSFER_GUIDE.md # Call transfer setup
├── SENTRY_INTEGRATION.md # Error monitoring setup
├── TIMEOUT_ERROR_FIX.md # Timeout optimization guide
├── GOOGLE_CLOUD_FREEPBX_SETUP.md # FusionPBX configuration
└── TWILIO_FREEPBX_SETUP.md # Twilio-FusionPBX integration
Database Schema
users_sambanova
- • id: UUID (PK)
- • email: String (unique)
- • username: String (unique)
- • password_hash: String
- • first_name: String
- • last_name: String
- • is_active: Boolean
- • is_verified: Boolean
- • created_at: DateTime
- • last_login_at: DateTime
teams_sambanova
- • id: UUID (PK)
- • name: String
- • description: Text
- • is_active: Boolean
- • created_at: DateTime
- • updated_at: DateTime
team_memberships_sambanova
- • id: UUID (PK)
- • team_id: UUID (FK)
- • user_id: UUID (FK)
- • role: Enum (owner/admin/member/viewer)
- • joined_at: DateTime
- • updated_at: DateTime
todos_sambanova
- • id: UUID (PK)
- • title: String
- • description: String
- • completed: Boolean
- • priority: Enum
- • due_date: DateTime
- • creator_id: UUID (FK)
- • assignee_id: UUID (FK)
- • team_id: UUID (FK)
- • is_private: Boolean
- • google_calendar_event_id
reminders_sambanova
- • id: UUID (PK)
- • reminder_text: String
- • importance: Enum
- • reminder_date: DateTime
- • google_calendar_event_id
- • created_at: DateTime
- • updated_at: DateTime
calendar_events_sambanova
- • id: UUID (PK)
- • title: String
- • description: String
- • event_from: DateTime
- • event_to: DateTime
- • google_calendar_event_id
- • created_at: DateTime
- • updated_at: DateTime
call_recordings_sambanova
- • id: UUID (PK)
- • call_sid: String (unique)
- • from_number: String
- • to_number: String
- • recording_path: String
- • transcription: Text
- • status: String
- • created_at: DateTime
Team Collaboration Architecture
Team Relationship Diagram
┌─────────────────┐
│ User │
├─────────────────┤
│ id (UUID) PK │◄─────┐
│ email (unique) │ │
│ username │ │
│ password_hash │ │
│ first_name │ │
│ last_name │ │
│ is_active │ │
└─────────────────┘ │
│ │
│ user_id (FK) │
▼ │
┌─────────────────┐ │
│ TeamMembership │ │
├─────────────────┤ │
│ id (UUID) PK │ │
│ team_id (FK)────┼──┐ │
│ user_id (FK)────┼──┘ │
│ role (ENUM) │ │
│ joined_at │ │
└─────────────────┘ │
│ │
│ team_id (FK) │
▼ │
┌─────────────────┐ │
│ Team │ │
├─────────────────┤ │
│ id (UUID) PK │◄─────┤
│ name │ │
│ description │ │
│ is_active │ │
└─────────────────┘ │
│ │
│ team_id (FK) │
▼ │
┌─────────────────┐ │
│ Todo │ │
├─────────────────┤ │
│ id (UUID) PK │ │
│ title │ │
│ priority │ │
│ creator_id (FK)─┼──────┘
│ assignee_id(FK)─┼──────┐
│ team_id (FK)────┼──┐ │
│ is_private │ │ │
│ google_cal_id │ │ │
└─────────────────┘ │ │
│ │
▼ ▼
Team & User References
Relationships: Users belong to Teams via TeamMembership. Todos can be assigned to Teams and Users with role-based permissions.
Team Roles
- • OWNER: Full control, can delete team
- • ADMIN: Manage members and todos
- • MEMBER: Create and edit own todos
- • VIEWER: Read-only access
Access Control
- • JWT token-based authentication
- • 30-minute access token expiry
- • 7-day refresh token validity
- • Role-based authorization checks
- • Team membership validation
LangGraph Agent Architecture
LangGraph Workflow Diagram
LangGraph Workflow: The agent can either continue to use tools or end the conversation based on user input and context.
Agent Components
- • TodoAgent Class: Main agent orchestrator with lazy initialization
- • StateGraph: Manages conversation flow and state
- • Assistant Node: GPT-4 reasoning and response generation
- • Tool Node: Executes 38 MCP tools
- • Conditional Edges: Routes between nodes based on tool calls
- • InMemorySaver: Checkpointer for state persistence
State Management
- • AgentState: Conversation state with message history
- • Message History: Maintains context across turns
- • Customer ID: User identification for multi-tenant
- • Thread ID: Conversation thread tracking
- • Lazy Loading: Prevents circular imports
- • ExceptionGroup Handling: Robust error recovery
Model Context Protocol (MCP) Integration
MCP provides a standardized way for AI agents to interact with external tools and services. The Sambanova system uses MCP to expose 38 tools for database operations, team management, call transfer to FusionPBX, Google Calendar integration, and WebRTC voice processing with Deepgram STT.
MCP Server Configuration
{
"mcpServers": {
"db": {
"command": "python",
"args": ["./sambanova/mcps/local_servers/db_todo.py"],
"transport": "stdio",
"env": {
"DB_URI": "${DB_URI}",
"GOOGLE_OAUTH2_TOKEN_B64": "${GOOGLE_OAUTH2_TOKEN_B64}",
"GOOGLE_CLIENT_ID": "${GOOGLE_CLIENT_ID}",
"GOOGLE_CLIENT_SECRET": "${GOOGLE_CLIENT_SECRET}"
}
}
}
}
Available MCP Tools (38)
Todo Management (5)
- • create_todo
- • get_todos
- • complete_todo
- • update_todo
- • delete_todo
Team Tools (8)
- • create_team
- • get_teams
- • get_team_members
- • create_team_todo
- • add_team_member
- • remove_team_member
- • change_member_role
- • search_users
Reminders (4)
- • create_reminder
- • get_reminders
- • update_reminder
- • delete_reminder
Calendar (6)
- • create_calendar_event
- • get_calendar_events
- • update_calendar_event
- • delete_calendar_event
- • sync_google_calendar_events
- • test_google_calendar
Call Transfer (2)
- • transfer_to_agent
- • get_available_departments
Enhanced LangGraph Tool Calls
The LangGraph implementation provides intelligent tool calling capabilities with dynamic tool selection and error handling. The agent automatically chooses appropriate tools based on user intent and maintains conversation context for seamless interactions.
Tool Calls Flow Diagram
Tool Calls Flow: LangGraph implementation showing dynamic tool selection and intelligent orchestration of MCP tools.
Tool Calls Features
MCP Integration
- • Database operations via MCP servers
- • Google Calendar synchronization
- • Team collaboration tools
- • Real-time tool discovery (38 tools)
- • Secure tool communication via stdio
- • Lazy loading for performance
Error Handling
- • Graceful tool failure recovery
- • ExceptionGroup unwrapping
- • 20s timeout per tool
- • 30s overall agent timeout
- • Fallback strategies
- • User-friendly error messages
Tool Features: Intelligent tool calling system with error recovery, timeout management, and seamless MCP integration.
Core Tool Calling Capabilities
- • Dynamic Tool Selection: LLM intelligently chooses appropriate tools based on user intent
- • Error Recovery: Graceful handling of tool failures with fallback strategies
- • Context Awareness: Tools access conversation history and maintain state
- • Streaming Responses: Real-time tool execution updates for better user experience
- • Async Execution: Non-blocking tool calls with proper timeout management
- • ExceptionGroup Handling: Unwraps and logs complex async exceptions
JWT Authentication System
Authentication Flow
Security Features
- • Password Hashing: Bcrypt with automatic salt
- • JWT Tokens: HS256 algorithm with secret key
- • Token Expiry: 30 min access, 7 day refresh
- • Authorization: @require_auth decorator
- • Role Validation: @require_role decorator
- • Team Membership: @require_team_member decorator
- • Auto Logout: Frontend handles expired tokens
JWT Token Structure
{
"user_id": "uuid",
"email": "user@example.com",
"roles": ["user"],
"team_id": "uuid",
"type": "access",
"exp": 1728589200, // 30 minutes from issue
"iat": 1728587400 // issued at timestamp
}
New Features Technical Details
Redis Session Management & Caching
Session Management
- • Session creation, retrieval, updates, deletion
- • Audio buffer storage (base64 encoded)
- • User authentication state management
- • Session expiration handling
Real-time Features
- • Pub/Sub notifications for team updates
- • Rate limiting (requests per minute)
- • User activity tracking & analytics
- • Cache invalidation strategies
# Redis connection with environment config
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')
# Session data structure
session_data = {
'user_id': 'user-123', 'audio_buffer': 'base64...',
'created_at': timestamp, 'expires_at': ttl
}
Composio External Tool Integration
Supported Platforms
- • Slack: Message sending, channel management
- • GitHub: Repository operations, issue tracking
- • Gmail: Email composition, sending, management
- • Notion: Page creation, database operations
- • Jira: Ticket creation, project management
Integration Features
- • OAuth2 authentication for each platform
- • Robust method discovery for API compatibility
- • Error handling for missing methods
- • Tool orchestration with LangGraph
# Composio tool loading with fallback methods
if hasattr(toolset, 'get_tools'):
tools = toolset.get_tools(apps=["slack"])
elif hasattr(toolset, 'get_actions'):
tools = toolset.get_actions(apps=["slack"])
Audio Stream Player & WebM Support
Audio Processing
- • WebM format detection & handling
- • Base64 audio buffer decoding
- • Real-time audio streaming from Redis
- • Audio format conversion (WebM → WAV fallback)
Player Features
- • Session-based audio playback
- • Audio file download (WebM/WAV)
- • Audio buffer analysis & debugging
- • Flask-SocketIO real-time updates
# WebM format detection
if audio_data.startswith(b'\x1a\x45\xdf\xa3'):
return Response(audio_data, mimetype='audio/webm')
# Audio buffer from Redis
audio_buffer = session_data.get('audio_buffer', '')
audio_data = base64.b64decode(audio_buffer)
API Endpoints
Authentication
POST /api/auth/registerPOST /api/auth/loginPOST /api/auth/refreshGET /api/auth/profilePUT /api/auth/profile
Team Management
POST /api/teams/GET /api/teams/GET /api/teams/{id}POST /api/teams/{id}/membersDELETE /api/teams/{id}/members/{user_id}PUT /api/teams/{id}/members/{user_id}/role
Twilio Voice
POST /sambanova_todo/twilio/callPOST /sambanova_todo/twilio/process_audioPOST /sambanova_todo/twilio/transfer
Webhook URL:
https://hjlees.com/sambanova_todo/twilio/call
WebRTC Voice
GET /sambanova_todo/webrtc/voice-assistantWebSocket /voiceGET /audio-player/
WebRTC Voice Interface:
https://hjlees.com/sambanova_todo/webrtc/voice-assistant
Features: Deepgram STT transcription, Redis audio buffer storage, Socket.IO real-time communication
WebRTC Voice Interface
The WebRTC Voice Interface provides browser-based voice interaction with real-time audio streaming, Redis session management, and seamless integration with the LangGraph AI agent. Users can interact with the assistant directly through their web browser without requiring phone calls.
WebRTC Voice Assistant Interface
WebRTC Interface: Browser-based voice assistant with real-time audio recording, session management, and AI-powered responses.
WebRTC Audio Processing Pipeline
WebRTC Pipeline: Complete browser-based voice processing from audio capture through Deepgram STT transcription to AI-generated responses with Redis session management.
WebRTC Components
- • MediaRecorder API: Browser audio recording
- • WebSocket: Real-time communication via Socket.IO
- • Redis Session: Audio buffer storage
- • WebM Format: Native browser audio format
- • Base64 Encoding: Audio data transmission
- • Deepgram STT: Real-time speech-to-text
- • Session Management: User state tracking
- • Audio Buffer: Redis storage for playback
Processing Flow
- 1. User clicks record button in browser
- 2. WebRTC MediaRecorder captures audio
- 3. Audio chunks sent via Socket.IO WebSocket
- 4. Server stores audio in Redis session buffer
- 5. User stops recording
- 6. Complete WebM blob sent to server
- 7. Deepgram STT transcribes audio
- 8. LangGraph agent processes request
- 9. OpenAI TTS generates response
- 10. Audio response played in browser
Twilio Voice Interface
The Sambanova assistant provides complete voice interaction through Twilio integration, supporting natural language commands for team collaboration and task management with Amazon Polly.Amy voice and barge-in capabilities for interruption.
Voice Processing Pipeline
Voice Pipeline: Complete end-to-end voice processing from speech input to AI-generated voice response with real-time streaming.
Voice Command Examples
Personal Productivity
- 🗣️ "Create a high priority todo to review the quarterly report"
- 🗣️ "Add a reminder to call the dentist tomorrow at 2 PM"
- 🗣️ "Schedule a meeting for next Friday from 2 to 3 PM"
- 🗣️ "Show me all my pending todos"
- 🗣️ "Mark the grocery shopping todo as completed"
Team Collaboration
- 🗣️ "Create a hackathon team"
- 🗣️ "What teams are available?"
- 🗣️ "Who are the members of the development team?"
- 🗣️ "Create a high priority todo for the dev team"
- 🗣️ "Add admin@sambanova.com to the hackathon team as owner"
- 🗣️ "Assign a code review task to John in the dev team"
- 🗣️ "Change john@example.com to admin role in the dev team"
Voice Components
- • Speech-to-Text: Twilio speech recognition engine
- • Text-to-Speech: Amazon Polly.Amy voice
- • Barge-in: Interrupt AI while speaking
- • Speech Timeout: Auto detection (10s)
- • Agent Timeout: 30s processing time
- • Tool Timeout: 20s per MCP tool
- • Continuation: Multi-turn conversations
- • Exit Detection: Natural conversation ending
Processing Flow
- 1. User calls Twilio number
- 2. Twilio webhook triggers /sambanova_todo/twilio/call
- 3. TwiML with <Gather> returned (10s timeout)
- 4. User speaks, Twilio recognizes speech
- 5. SpeechResult posted to /process_audio
- 6. LangGraph agent processes (30s timeout)
- 7. MCP tools execute (20s per tool)
- 8. Database & Calendar operations
- 9. Response converted to TwiML
- 10. Polly.Amy speaks response with barge-in
- 11. Conversation continues or ends
Twilio Phone Integration
The Sambanova assistant can be accessed via phone calls through Twilio integration, allowing users to interact with the AI assistant through voice calls from any phone number. The system uses TwiML and Gather for speech recognition with barge-in capability.
Twilio Call Flow Architecture
Call Flow: Phone → Twilio → Flask Webhook → LangGraph Agent → MCP Tools → Database/Calendar → TwiML Response
Twilio Components
- • Twilio Voice API: Handles incoming/outgoing calls
- • TwiML: XML-based call flow instructions
- • Gather Element: Collects speech input with barge-in
- • Say Element: Text-to-speech via Polly.Amy
- • Redirect Element: Call continuation logic
- • Webhook Endpoints: Flask routes for call handling
Call Flow Process
- 1. User calls Twilio phone number
- 2. Twilio webhook triggers /twilio/call
- 3. TwiML with <Gather> returned
- 4. Greeting: "Hello! I'm your Sambanova productivity assistant"
- 5. User speaks command
- 6. Speech posted to /process_audio
- 7. LangGraph agent processes request
- 8. MCP tools execute database operations
- 9. TwiML response with confirmation
- 10. Call continues or ends based on user
Twilio Configuration
# Environment Variables
TWILIO_ACCOUNT_SID=your_account_sid
TWILIO_AUTH_TOKEN=your_auth_token
TWILIO_PHONE_NUMBER=+1234567890
# Webhook Endpoints
POST https://hjlees.com/sambanova_todo/twilio/call
POST https://hjlees.com/sambanova_todo/twilio/process_audio
# TwiML Response Example
<Response>
<Gather action="/sambanova_todo/twilio/process_audio"
method="POST"
input="speech"
speechTimeout="auto"
timeout="10"
bargeIn="true">
<Say voice="Polly.Amy">Hello! I'm your Sambanova productivity assistant...</Say>
</Gather>
<Say voice="Polly.Amy">I didn't hear anything. Please try again.</Say>
<Redirect>/sambanova_todo/twilio/call?is_continuation=true</Redirect>
</Response>
Twilio Setup Instructions
Step 1: Configure Voice Settings
In Twilio Console → Phone Numbers → Active Numbers → Select your number
Step 2: Set Webhook URL
A CALL COMES IN: https://hjlees.com/sambanova_todo/twilio/call (HTTP POST)
Step 3: Test Integration
Call your Twilio number and speak a command
Code Examples
LangGraph Agent (assistant_graph_todo.py)
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
class TodoAgent:
def __init__(self, tools: List[BaseTool] = []):
self.tools = tools
self.llm = ChatOpenAI(
model="gpt-4.1-mini-2025-04-14"
).bind_tools(tools=self.tools)
self.graph = self.build_graph()
def build_graph(self) -> CompiledStateGraph:
builder = StateGraph(AgentState)
def assistant(state: AgentState):
response = self.llm.invoke(state.messages)
state.messages.append(response)
return state
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(self.tools))
builder.set_entry_point("assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
return builder.compile(checkpointer=InMemorySaver())
# Lazy initialization to avoid circular imports
_agent_instance = None
def get_agent():
global _agent_instance
if _agent_instance is None:
_agent_instance = TodoAgent()
return _agent_instance
Team Management API (team_routes.py)
from flask import Blueprint, request, jsonify
from sambanova.security.auth import require_auth
from sambanova.models.user_models import User, Team, TeamMembership
team_bp = Blueprint('teams', __name__, url_prefix='/api/teams')
@team_bp.route('/', methods=['POST'])
@require_auth
def create_team():
"""Create a new team"""
data = request.get_json()
user_id = request.current_user['user_id']
with SessionLocal() as session:
# Create team
team = Team(
name=data['name'],
description=data.get('description', ''),
is_active=True
)
session.add(team)
session.commit()
# Add creator as team owner
membership = TeamMembership(
team_id=team.id,
user_id=user_id,
role=TeamRole.OWNER
)
session.add(membership)
session.commit()
return jsonify({
'message': 'Team created successfully',
'team': {
'id': str(team.id),
'name': team.name
}
}), 201
@team_bp.route('/', methods=['GET'])
@require_auth
def get_user_teams():
"""Get all teams for the current user"""
user_id = request.current_user['user_id']
with SessionLocal() as session:
results = session.query(TeamMembership, Team).join(
Team, TeamMembership.team_id == Team.id
).filter(TeamMembership.user_id == user_id).all()
teams = [{
'id': str(team.id),
'name': team.name,
'role': membership.role.value
} for membership, team in results]
return jsonify({'teams': teams}), 200
MCP Team Tools (db_todo.py)
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("db_todo")
# Lazy import to avoid circular dependencies
def _lazy_import_team_models():
global User, Team, TeamMembership, TeamRole
if User is None:
from sambanova.models.user_models import User, Team, TeamMembership, TeamRole
@mcp.tool()
async def create_team_todo(
title: str,
team_id: str,
description: Optional[str] = None,
priority: TodoPriority = TodoPriority.MEDIUM,
assignee_id: Optional[str] = None,
due_date: Optional[datetime] = None,
) -> str:
"""Create a todo item for a specific team."""
_lazy_import_team_models()
check_database_available()
with SessionLocal() as session:
# Verify team exists
team = session.query(Team).filter(Team.id == team_id).first()
if not team:
return f"Team with ID {team_id} not found."
# Verify assignee is a team member
if assignee_id:
membership = session.query(TeamMembership).filter(
TeamMembership.team_id == team_id,
TeamMembership.user_id == assignee_id
).first()
if not membership:
return f"User is not a member of team '{team.name}'."
# Create team todo
new_todo = DBTodo(
title=title,
description=description,
priority=priority.value,
due_date=due_date or datetime.now(timezone.utc),
team_id=team_id,
assignee_id=assignee_id,
)
session.add(new_todo)
session.commit()
# Create Google Calendar event
if get_calendar_service:
calendar_service = get_calendar_service()
event = calendar_service.events().insert(
calendarId='primary',
body={
'summary': f"[Team] {title}",
'description': f"Team: {team.name}\n{description or ''}",
'start': {'dateTime': due_date.isoformat(), 'timeZone': 'UTC'},
'end': {'dateTime': (due_date + timedelta(hours=1)).isoformat(), 'timeZone': 'UTC'}
}
).execute()
new_todo.google_calendar_event_id = event.get('id')
session.commit()
return f"✅ Team todo created for {team.name}"
@mcp.tool()
async def get_teams() -> str:
"""Get all available teams."""
_lazy_import_team_models()
with SessionLocal() as session:
teams = session.query(Team).filter(Team.is_active == True).all()
result = "Available teams:\n"
for team in teams:
result += f"• {team.name} (ID: {team.id})\n"
return result
@mcp.tool()
async def get_team_members(team_id: str) -> str:
"""Get all members of a specific team."""
_lazy_import_team_models()
with SessionLocal() as session:
team = session.query(Team).filter(Team.id == team_id).first()
if not team:
return f"Team not found."
members = session.query(TeamMembership, User).join(
User, TeamMembership.user_id == User.id
).filter(TeamMembership.team_id == team_id).all()
result = f"Members of '{team.name}':\n"
for membership, user in members:
result += f"• {user.full_name} - {membership.role.value}\n"
return result
JWT Authentication (auth.py)
import jwt
import bcrypt
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify
class JWTAuth:
def __init__(self):
self.secret_key = os.getenv('JWT_SECRET_KEY')
self.algorithm = 'HS256'
self.access_token_expire_minutes = 30
def hash_password(self, password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(self, user_id: str, email: str, roles: list = None) -> str:
"""Create JWT access token"""
payload = {
'user_id': user_id,
'email': email,
'roles': roles or ['user'],
'type': 'access',
'exp': datetime.now(timezone.utc) + timedelta(minutes=30),
'iat': datetime.now(timezone.utc)
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[Dict]:
"""Verify and decode JWT token"""
try:
return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def require_auth(self, f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = self.get_token_from_header()
if not token:
return jsonify({'error': 'No token provided'}), 401
payload = self.verify_token(token)
if not payload:
return jsonify({'error': 'Invalid or expired token'}), 401
request.current_user = {
'user_id': payload['user_id'],
'email': payload['email'],
'roles': payload.get('roles', [])
}
return f(*args, **kwargs)
return decorated_function
jwt_auth = JWTAuth()
require_auth = jwt_auth.require_auth
Twilio Webhook Handler (routes.py)
from flask import Blueprint, request, Response
from twilio.twiml.voice_response import VoiceResponse, Gather
import asyncio
sambanova_todo_bp = Blueprint('sambanova_todo', __name__, url_prefix='/sambanova_todo')
@sambanova_todo_bp.route('/twilio/call', methods=['POST'])
def twilio_call_webhook():
"""Handle incoming calls with barge-in capability"""
is_continuation = request.args.get('is_continuation', 'false').lower() == 'true'
response = VoiceResponse()
gather = response.gather(
input='speech',
action='/sambanova_todo/twilio/process_audio',
method='POST',
speech_timeout='auto',
timeout=10,
barge_in=True
)
if not is_continuation:
gather.say(
"Hello! I'm your Sambanova productivity assistant. How can I help you today?",
voice='Polly.Amy'
)
response.say("I didn't hear anything. Please try again.", voice='Polly.Amy')
response.redirect('/sambanova_todo/twilio/call?is_continuation=true')
return Response(str(response), mimetype='text/xml')
@sambanova_todo_bp.route('/twilio/process_audio', methods=['POST'])
def process_audio_webhook():
"""Process speech input from Twilio"""
transcribed_text = request.form.get('SpeechResult', '')
if not transcribed_text:
response = VoiceResponse()
gather = Gather(input='speech', action='/sambanova_todo/twilio/process_audio')
gather.say("I didn't catch that. Could you please repeat?", voice='Polly.Amy')
response.append(gather)
return Response(str(response), mimetype='text/xml')
# Check for exit phrases
exit_phrases = ['exit', 'goodbye', 'bye', 'done']
if any(phrase in transcribed_text.lower() for phrase in exit_phrases):
response = VoiceResponse()
response.say("Thank you! Have a great day!", voice='Polly.Amy')
response.hangup()
return Response(str(response), mimetype='text/xml')
# Process with agent (30s timeout)
try:
agent_response = asyncio.run(
asyncio.wait_for(_run_agent_async(transcribed_text), timeout=30.0)
)
except asyncio.TimeoutError:
agent_response = "I'm taking too long. Please try a simpler request."
# Return TwiML response
response = VoiceResponse()
gather = Gather(input='speech', action='/sambanova_todo/twilio/process_audio', barge_in=True)
gather.say(agent_response, voice='Polly.Amy')
response.append(gather)
response.redirect('/sambanova_todo/twilio/call?is_continuation=true')
return Response(str(response), mimetype='text/xml')
Deepgram WebRTC Integration (deepgram_service.py)
from deepgram import DeepgramClient
import requests
import os
import tempfile
class DeepgramService:
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.getenv('DEEPGRAM_API_KEY')
self.client = DeepgramClient(api_key=self.api_key)
def transcribe_audio_buffer(self, audio_buffer: bytes, language: str = "en") -> Optional[str]:
"""Transcribe audio buffer using Deepgram's HTTP API"""
# Detect WebM/EBML header
is_webm = len(audio_buffer) >= 4 and audio_buffer[:4] == b"\x1a\x45\xdf\xa3"
if is_webm:
# Send WebM directly to Deepgram
with tempfile.NamedTemporaryFile(suffix='.webm', delete=False) as temp_file:
temp_file.write(audio_buffer)
temp_file_path = temp_file.name
try:
result = self._transcribe_file(temp_file_path, language)
return result
finally:
os.unlink(temp_file_path)
else:
# Create WAV from PCM
wav_file = self._create_wav_from_pcm(audio_buffer)
try:
result = self._transcribe_file(wav_file, language)
return result
finally:
os.unlink(wav_file)
def _transcribe_file(self, file_path: str, language: str) -> Optional[str]:
"""Transcribe file using Deepgram HTTP API"""
url = "https://api.deepgram.com/v1/listen"
params = {
"model": "nova-2",
"language": language,
"smart_format": "true",
"punctuate": "true"
}
headers = {
"Authorization": f"Token {self.api_key}",
"Content-Type": "audio/webm" if file_path.endswith('.webm') else "audio/wav"
}
with open(file_path, 'rb') as audio_file:
response = requests.post(url, params=params, headers=headers,
data=audio_file.read(), timeout=30)
if response.status_code == 200:
result = response.json()
if result.get("results") and result["results"].get("channels"):
channel = result["results"]["channels"][0]
if channel.get("alternatives"):
return channel["alternatives"][0].get("transcript", "").strip()
return None
WebRTC Voice Server (webrtc_voice_server.py)
from flask_socketio import SocketIO, emit
from deepgram_webrtc_integration import transcribe_audio_with_deepgram_webrtc
import base64
@socketio.on('stop_recording', namespace='/voice')
def handle_stop_recording(data=None):
"""Handle WebRTC recording stop and process audio"""
session_id = request.sid
if data and 'audio' in data:
# Decode complete WebM blob from client
audio_buffer = base64.b64decode(data['audio'])
# Store in Redis for audio player
update_session(session_id, {'audio_buffer': data['audio']})
# Transcribe with Deepgram
transcribed_text = transcribe_audio_with_deepgram_webrtc(audio_buffer)
if transcribed_text:
# Process with LangGraph agent
agent_response = asyncio.run(process_with_agent(
transcribed_text, user_id, user_name
))
# Generate TTS response
speech_response = openai_client.audio.speech.create(
model="tts-1",
voice="nova",
input=agent_response,
response_format="mp3"
)
audio_base64 = base64.b64encode(speech_response.content).decode('utf-8')
# Send to client
emit('agent_response', {
'success': True,
'text': agent_response,
'audio': audio_base64
}, namespace='/voice', room=session_id)
Usage Guide
Setup and Deployment
1. Environment Setup
# Install dependencies
pip install -r requirements.txt
# Set up environment variables
export OPENAI_API_KEY="your_openai_key"
export DB_URI="postgresql://user:pass@host/db"
export JWT_SECRET_KEY="your-super-secret-jwt-key"
export TWILIO_ACCOUNT_SID="your_twilio_sid"
export TWILIO_AUTH_TOKEN="your_twilio_token"
export TWILIO_PHONE_NUMBER="+1234567890"
export GOOGLE_OAUTH2_TOKEN_B64="base64_encoded_token"
export GOOGLE_CLIENT_ID="your_client_id"
export GOOGLE_CLIENT_SECRET="your_client_secret"
export DEEPGRAM_API_KEY="your_deepgram_api_key"
export FREEPBX_DOMAIN="34.26.59.14"
export REDIS_URL="redis://localhost:6379"
2. Database Migration
# Run team collaboration migration
python run_migration.py
# This creates:
# - users_sambanova table
# - teams_sambanova table
# - team_memberships_sambanova table
# - Adds team columns to todos_sambanova
# - Creates demo admin user (admin@sambanova.com / admin123)
3. Deepgram Configuration
# Get Deepgram API key from: https://console.deepgram.com
export DEEPGRAM_API_KEY="your_deepgram_api_key"
# Deepgram supports WebRTC audio formats:
# - WebM (preferred for browser recording)
# - WAV (fallback for PCM conversion)
4. Twilio Configuration
# Configure Twilio phone number webhook:
Webhook URL: https://hjlees.com/sambanova_todo/twilio/call
HTTP Method: POST
# Test webhook:
curl -X POST https://hjlees.com/sambanova_todo/twilio/call \
-d "CallSid=test123&From=+1234567890"
5. Start Production Service
# Local development:
python app.py
# Production (Render.com):
gunicorn --worker-class gthread -w 1 --threads 4 --bind 0.0.0.0:$PORT start_simple:app
# Verify deployment:
curl https://hjlees.com/sambanova_todo/
curl https://hjlees.com/team-dashboard
Production Usage
Team Dashboard
Access the team dashboard at /team-dashboard
to manage teams, members, and todos through a web interface.
Demo Credentials: admin@sambanova.com / admin123
Twilio Voice Commands
- • "Create a high priority todo to review the quarterly report"
- • "Add a reminder to call mom tomorrow at 2 PM"
- • "Schedule a meeting for Friday from 2 to 3 PM"
- • "Show me all my pending todos"
- • "Mark the grocery shopping todo as completed"
WebRTC Voice Commands
- • "Create a todo task to buy groceries"
- • "Add a reminder for team meeting"
- • "Transfer me to an agent"
- • "Create a team called Engineering"
- • "Show my todos"
Access: /sambanova_todo/webrtc/voice-assistant
Team Commands
- • "Create a high priority todo for the development team"
- • "What teams are available?"
- • "Who are the members of the development team?"
- • "Assign a code review task to John in the dev team"
- • "Create a demo team meeting on October 10th at 5 PM"
Technical Highlights
Circular Import Resolution
Sophisticated lazy loading pattern to break circular dependencies between MCP servers and Flask routes.
- ✅ Lazy import of team models
- ✅ Lazy agent initialization
- ✅ Shared Base class pattern
- ✅ extend_existing table args
Async Context Management
Proper async/await patterns with context managers for MCP client and database connections.
- ✅ async with MultiServerMCPClient
- ✅ asyncio.wait_for with timeouts
- ✅ ExceptionGroup handling
- ✅ 30s timeout for tool execution
Enterprise Security
Production-grade security with JWT authentication, bcrypt hashing, and role-based access control.
- ✅ JWT HS256 tokens
- ✅ Bcrypt password hashing
- ✅ Role-based authorization
- ✅ Auto token expiry handling
✅ Production-Ready Features
Infrastructure
- ✅ Render.com cloud deployment
- ✅ PostgreSQL database
- ✅ Environment variable management
- ✅ Build scripts & migrations
Team Collaboration
- ✅ Multi-tenant architecture
- ✅ Team member management
- ✅ Task assignment
- ✅ Role-based permissions
AI Integration
- ✅ Voice-enabled AI agent
- ✅ 38 MCP tools
- ✅ Google Calendar sync
- ✅ Natural language processing
Voice Processing
- ✅ Twilio voice integration
- ✅ WebRTC browser voice
- ✅ Deepgram STT (95%+ accuracy)
- ✅ Redis audio buffer
API Reference
Authentication API
Register a new user account
Request: {
"email": "user@example.com",
"username": "johndoe",
"password": "secure123",
"first_name": "John",
"last_name": "Doe"
}
Response: {
"message": "User registered successfully",
"user": {...},
"tokens": {
"access_token": "eyJ...",
"refresh_token": "eyJ..."
}
}
Login and receive JWT tokens
Request: {
"email": "user@example.com",
"password": "secure123"
}
Response: {
"message": "Login successful",
"user": {
"id": "uuid",
"email": "user@example.com",
"teams": [...]
},
"tokens": {
"access_token": "eyJ...",
"refresh_token": "eyJ..."
}
}
Team Management API
Create a new team (requires auth)
Headers: {
"Authorization": "Bearer eyJ..."
}
Request: {
"name": "Development Team",
"description": "Core development team"
}
Response: {
"message": "Team created successfully",
"team": {
"id": "uuid",
"name": "Development Team",
"members_count": 1
}
}
Get all teams for current user (requires auth)
Headers: {
"Authorization": "Bearer eyJ..."
}
Response: {
"teams": [
{
"id": "uuid",
"name": "Development Team",
"role": "owner",
"members_count": 5
}
]
}
Add member to team (requires auth)
Request: {
"user_id": "uuid",
"role": "member"
}
Response: {
"message": "Member added successfully"
}