FastAPI WebSocket Real-time Patterns
Build real-time features with FastAPI WebSockets and connection manager patterns.
Overview
FastAPI's WebSocket support enables real-time bidirectional communication, essential for live dashboards, collaborative features, and notification systems. Unlike HTTP request-response, WebSockets maintain persistent connections, allowing servers to push updates to clients instantly. The WebSocket handling pattern uses async/await for non-blocking connection management. Each connected client maintains its own connection object, and the server can broadcast messages to multiple clients simultaneously. Connection managers track active connections and handle graceful connect/disconnect lifecycle events. Authentication in WebSockets requires careful handling since there's no request-response pattern. Tokens are typically passed during the connection handshake via headers or query parameters. The connection can be rejected immediately if authentication fails, preventing unauthorized connections from consuming resources. Broadcasting patterns vary by use case. For simple broadcasting to all clients, a manager class holds all connections and iterates through them. For more complex scenarios like chat rooms or collaborative editing, connections should be grouped by room or document ID. This reduces message distribution overhead and enables room-specific events. Backpressure handling ensures servers aren't overwhelmed by fast producers. When clients can't keep up, the server should either queue messages (limited queue size) or drop old messages. Connection cleanup must be robust—using try/finally ensures connections are removed from managers even if exceptions occur during message handling.
Code Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Set
import asyncio
import json
from datetime import datetime
import jwt
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_id: str, user_id: str):
await websocket.accept()
if room_id not in self.active_connections:
self.active_connections[room_id] = set()
self.active_connections[room_id].add(websocket)
await self.broadcast(room_id, {
"type": "user_joined",
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"active_users": len(self.active_connections[room_id])
})
def disconnect(self, websocket: WebSocket, room_id: str, user_id: str):
if room_id in self.active_connections:
self.active_connections[room_id].discard(websocket)
if not self.active_connections[room_id]:
del self.active_connections[room_id]
async def broadcast(self, room_id: str, message: dict):
if room_id not in self.active_connections:
return
disconnected = set()
message_text = json.dumps(message)
for connection in self.active_connections[room_id]:
try:
await connection.send_text(message_text)
except Exception:
disconnected.add(connection)
for conn in disconnected:
self.active_connections[room_id].discard(conn)
async def send_personal(self, websocket: WebSocket, message: dict):
try:
await websocket.send_json(message)
except Exception:
pass
manager = ConnectionManager()
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, token: str = None):
try:
# Authenticate
if not token:
await manager.send_personal(websocket, {"error": "Authentication required"})
return
payload = jwt.decode(token, "secret", algorithms=["HS256"])
user_id = payload.get("sub")
if not user_id:
await manager.send_personal(websocket, {"error": "Invalid token"})
return
await manager.connect(websocket, room_id, user_id)
while True:
data = await websocket.receive_json()
await manager.broadcast(room_id, {
"type": "message",
"user_id": user_id,
"content": data.get("content", ""),
"timestamp": datetime.utcnow().isoformat()
})
except WebSocketDisconnect:
manager.disconnect(websocket, room_id, user_id)
await manager.broadcast(room_id, {
"type": "user_left",
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat()
})
except Exception as e:
await manager.send_personal(websocket, {"error": str(e)})More FastAPI Rules
FastAPI Dependency Injection Pattern
Use FastAPI's dependency injection system for authentication, database sessions, and shared business logic.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing im...FastAPI Async Database Patterns
Implement async database operations with SQLAlchemy 2.0 for high-concurrency FastAPI applications.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy i...FastAPI Background Tasks
Handle long-running operations with BackgroundTasks and Celery for distributed task queues.
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from ...