WebSocketService - Real-Time Communication
WebSocket Service for real-time bidirectional communication between backend and frontend clients. Manages client connections, subscriptions, and message broadcasting with heartbeat mechanism.
Overview
graph TB
subgraph Backend
WSS[WebSocket Server]
SERVICE[WebSocketService]
AGGREGATOR[DataAggregator]
end
subgraph Clients
C1[Client 1]
C2[Client 2]
C3[Client 3]
end
subgraph Topics
CAMERAS[cameras]
WEATHER[weather]
ACCIDENTS[accidents]
AIR[airQuality]
end
AGGREGATOR --> SERVICE
SERVICE --> WSS
WSS --> C1
WSS --> C2
WSS --> C3
CAMERAS -.-> C1
WEATHER -.-> C2
ACCIDENTS -.-> C1
ACCIDENTS -.-> C3
Features
| Feature | Description |
|---|---|
| Client Management | Connection tracking with subscription state |
| Topic Pub/Sub | Clients subscribe to specific data topics |
| Heartbeat | Ping/pong every 10s, 30s timeout for dead connections |
| Initial Snapshot | Full data snapshot on new connection |
| Broadcast | Send to all clients or specific topic subscribers |
| Message Types | UPDATE, ALERT, SNAPSHOT, PONG |
Architecture
sequenceDiagram
participant Client
participant WSS as WebSocketServer
participant Service as WebSocketService
participant Aggregator as DataAggregator
Client->>WSS: Connect
WSS->>Service: on('connection')
Service->>Service: Register client
Service->>Client: Connection ACK
Service->>Aggregator: getSnapshot()
Aggregator-->>Service: Current state
Service->>Client: SNAPSHOT message
loop Heartbeat (10s)
Service->>Client: ping
Client-->>Service: pong
Service->>Service: Update lastPing
end
loop Data Updates
Aggregator->>Service: broadcast(data)
Service->>Client: UPDATE message
end
Class Definition
import { WebSocketServer, WebSocket } from 'ws';
interface ClientSubscription {
ws: WebSocket;
topics: string[];
lastPing: number;
}
export class WebSocketService {
private wss: WebSocketServer;
private clients: Map<WebSocket, ClientSubscription>;
private heartbeatInterval: NodeJS.Timeout | null;
private readonly HEARTBEAT_INTERVAL = 10000; // 10 seconds
private readonly HEARTBEAT_TIMEOUT = 30000; // 30 seconds
private snapshotProvider: (() => any) | null;
constructor(wss: WebSocketServer);
// Snapshot
setSnapshotProvider(provider: () => any): void;
// Broadcasting
broadcast(message: WebSocketMessage): void;
broadcastToTopic(topic: string, message: WebSocketMessage): void;
// Client Management
getClientCount(): number;
getTopicSubscribers(topic: string): number;
// Lifecycle
private initialize(): void;
private startHeartbeat(): void;
private handleClientMessage(ws: WebSocket, data: any): void;
private sendInitialSnapshot(ws: WebSocket, snapshot: any): void;
}
interface WebSocketMessage {
type: 'UPDATE' | 'ALERT' | 'SNAPSHOT' | 'PONG';
topic?: string;
data: any;
timestamp: string;
}