Back to Rules
FastAPI86% popularity

FastAPI WebSocket Real-time Patterns

Build real-time features with FastAPI WebSockets and connection manager patterns.

Sebastián RamírezUpdated Mar 15, 2024

Overview

FastAPI's WebSocket support enables real-time bidirectional communication, essential for live dashboards, collaborative features, and notification systems. Unlike HTTP request-response, WebSockets maintain persistent connections, allowing servers to push updates to clients instantly. The WebSocket handling pattern uses async/await for non-blocking connection management. Each connected client maintains its own connection object, and the server can broadcast messages to multiple clients simultaneously. Connection managers track active connections and handle graceful connect/disconnect lifecycle events. Authentication in WebSockets requires careful handling since there's no request-response pattern. Tokens are typically passed during the connection handshake via headers or query parameters. The connection can be rejected immediately if authentication fails, preventing unauthorized connections from consuming resources. Broadcasting patterns vary by use case. For simple broadcasting to all clients, a manager class holds all connections and iterates through them. For more complex scenarios like chat rooms or collaborative editing, connections should be grouped by room or document ID. This reduces message distribution overhead and enables room-specific events. Backpressure handling ensures servers aren't overwhelmed by fast producers. When clients can't keep up, the server should either queue messages (limited queue size) or drop old messages. Connection cleanup must be robust—using try/finally ensures connections are removed from managers even if exceptions occur during message handling.

Code Example

.fastapirules
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Set
import asyncio
import json
from datetime import datetime
import jwt

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, room_id: str, user_id: str):
        await websocket.accept()

        if room_id not in self.active_connections:
            self.active_connections[room_id] = set()

        self.active_connections[room_id].add(websocket)
        await self.broadcast(room_id, {
            "type": "user_joined",
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat(),
            "active_users": len(self.active_connections[room_id])
        })

    def disconnect(self, websocket: WebSocket, room_id: str, user_id: str):
        if room_id in self.active_connections:
            self.active_connections[room_id].discard(websocket)
            if not self.active_connections[room_id]:
                del self.active_connections[room_id]

    async def broadcast(self, room_id: str, message: dict):
        if room_id not in self.active_connections:
            return

        disconnected = set()
        message_text = json.dumps(message)

        for connection in self.active_connections[room_id]:
            try:
                await connection.send_text(message_text)
            except Exception:
                disconnected.add(connection)

        for conn in disconnected:
            self.active_connections[room_id].discard(conn)

    async def send_personal(self, websocket: WebSocket, message: dict):
        try:
            await websocket.send_json(message)
        except Exception:
            pass

manager = ConnectionManager()

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, token: str = None):
    try:
        # Authenticate
        if not token:
            await manager.send_personal(websocket, {"error": "Authentication required"})
            return

        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        user_id = payload.get("sub")

        if not user_id:
            await manager.send_personal(websocket, {"error": "Invalid token"})
            return

        await manager.connect(websocket, room_id, user_id)

        while True:
            data = await websocket.receive_json()
            await manager.broadcast(room_id, {
                "type": "message",
                "user_id": user_id,
                "content": data.get("content", ""),
                "timestamp": datetime.utcnow().isoformat()
            })

    except WebSocketDisconnect:
        manager.disconnect(websocket, room_id, user_id)
        await manager.broadcast(room_id, {
            "type": "user_left",
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat()
        })
    except Exception as e:
        await manager.send_personal(websocket, {"error": str(e)})

More FastAPI Rules

FASTAPI
95%

FastAPI Dependency Injection Pattern

Use FastAPI's dependency injection system for authentication, database sessions, and shared business logic.

dependency-injectionauthenticationarchitecture
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing im...
Feb 20, 2024by Sebastián Ramírez
View Rule
FASTAPI
88%

FastAPI Async Database Patterns

Implement async database operations with SQLAlchemy 2.0 for high-concurrency FastAPI applications.

fastapiasyncdatabase
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy i...
Mar 12, 2024by TiSever
View Rule
FASTAPI
84%

FastAPI Background Tasks

Handle long-running operations with BackgroundTasks and Celery for distributed task queues.

fastapibackground-taskscelery
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from ...
Mar 20, 2024by Sebastián Ramírez
View Rule