Skip to main content

WebSocketService - Real-Time Communication

WebSocket Service for real-time bidirectional communication between backend and frontend clients. Manages client connections, subscriptions, and message broadcasting with heartbeat mechanism.

Overview​

graph TB
subgraph Backend
WSS[WebSocket Server]
SERVICE[WebSocketService]
AGGREGATOR[DataAggregator]
end

subgraph Clients
C1[Client 1]
C2[Client 2]
C3[Client 3]
end

subgraph Topics
CAMERAS[cameras]
WEATHER[weather]
ACCIDENTS[accidents]
AIR[airQuality]
end

AGGREGATOR --> SERVICE
SERVICE --> WSS
WSS --> C1
WSS --> C2
WSS --> C3

CAMERAS -.-> C1
WEATHER -.-> C2
ACCIDENTS -.-> C1
ACCIDENTS -.-> C3

Features​

FeatureDescription
Client ManagementConnection tracking with subscription state
Topic Pub/SubClients subscribe to specific data topics
HeartbeatPing/pong every 10s, 30s timeout for dead connections
Initial SnapshotFull data snapshot on new connection
BroadcastSend to all clients or specific topic subscribers
Message TypesUPDATE, ALERT, SNAPSHOT, PONG

Architecture​

sequenceDiagram
participant Client
participant WSS as WebSocketServer
participant Service as WebSocketService
participant Aggregator as DataAggregator

Client->>WSS: Connect
WSS->>Service: on('connection')
Service->>Service: Register client
Service->>Client: Connection ACK
Service->>Aggregator: getSnapshot()
Aggregator-->>Service: Current state
Service->>Client: SNAPSHOT message

loop Heartbeat (10s)
Service->>Client: ping
Client-->>Service: pong
Service->>Service: Update lastPing
end

loop Data Updates
Aggregator->>Service: broadcast(data)
Service->>Client: UPDATE message
end

Class Definition​

import { WebSocketServer, WebSocket } from 'ws';

interface ClientSubscription {
ws: WebSocket;
topics: string[];
lastPing: number;
}

export class WebSocketService {
private wss: WebSocketServer;
private clients: Map<WebSocket, ClientSubscription>;
private heartbeatInterval: NodeJS.Timeout | null;
private readonly HEARTBEAT_INTERVAL = 10000; // 10 seconds
private readonly HEARTBEAT_TIMEOUT = 30000; // 30 seconds
private snapshotProvider: (() => any) | null;

constructor(wss: WebSocketServer);

// Snapshot
setSnapshotProvider(provider: () => any): void;

// Broadcasting
broadcast(message: WebSocketMessage): void;
broadcastToTopic(topic: string, message: WebSocketMessage): void;

// Client Management
getClientCount(): number;
getTopicSubscribers(topic: string): number;

// Lifecycle
private initialize(): void;
private startHeartbeat(): void;
private handleClientMessage(ws: WebSocket, data: any): void;
private sendInitialSnapshot(ws: WebSocket, snapshot: any): void;
}

interface WebSocketMessage {
type: 'UPDATE' | 'ALERT' | 'SNAPSHOT' | 'PONG';
topic?: string;
data: any;
timestamp: string;
}

Configuration​

// Service Constants
const HEARTBEAT_INTERVAL = 10000; // 10 seconds between pings
const HEARTBEAT_TIMEOUT = 30000; // 30 seconds before disconnect

// Environment Variables
const WS_PORT = process.env.WS_PORT || 5000; // Same as HTTP server

Usage Examples​

Server Setup​

import express from 'express';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { WebSocketService } from './services/websocketService';
import { DataAggregator } from './services/dataAggregator';

const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });

// Initialize WebSocket service
const wsService = new WebSocketService(wss);

// Initialize data aggregator (registers snapshot provider)
const aggregator = new DataAggregator(wsService);
aggregator.start();

server.listen(5000);

Broadcasting Updates​

// Broadcast camera update to all subscribed clients
wsService.broadcast({
type: 'UPDATE',
topic: 'cameras',
data: {
id: 'urn:ngsi-ld:Camera:001',
name: 'Camera Nguyen Hue',
imageUrl: 'http://camera.hcm/001/live.jpg',
lastUpdated: new Date().toISOString()
},
timestamp: new Date().toISOString()
});

Broadcasting Alerts​

// Broadcast accident alert
wsService.broadcast({
type: 'ALERT',
topic: 'accidents',
data: {
id: 'urn:ngsi-ld:RoadAccident:new-001',
severity: 'high',
location: { latitude: 10.77, longitude: 106.70 },
description: 'Multi-vehicle collision on Nguyen Hue'
},
timestamp: new Date().toISOString()
});

Message Types​

Connection Message​

{
"type": "connection",
"message": "Connected to HCMC Traffic Monitoring System",
"timestamp": "2025-11-29T10:30:00.000Z",
"heartbeatInterval": 10000
}

Snapshot Message​

{
"type": "SNAPSHOT",
"data": {
"cameras": [...],
"weather": [...],
"airQuality": [...],
"accidents": [...],
"patterns": [...]
},
"timestamp": "2025-11-29T10:30:00.000Z"
}

Update Message​

{
"type": "UPDATE",
"topic": "cameras",
"data": [
{
"id": "urn:ngsi-ld:Camera:001",
"name": "Camera Nguyen Hue",
"imageUrl": "http://...",
"status": "active"
}
],
"timestamp": "2025-11-29T10:30:30.000Z"
}

Alert Message​

{
"type": "ALERT",
"topic": "accidents",
"data": {
"id": "urn:ngsi-ld:RoadAccident:001",
"severity": "high",
"location": { "latitude": 10.77, "longitude": 106.70 }
},
"timestamp": "2025-11-29T10:31:00.000Z"
}

Client Message Handling​

Subscribe to Topics​

// Client -> Server
{
"action": "subscribe",
"topics": ["cameras", "accidents"]
}

Unsubscribe​

// Client -> Server
{
"action": "unsubscribe",
"topics": ["weather"]
}

Ping/Pong​

// Server sends ping
ws.ping();

// Client automatically responds with pong
ws.on('pong', () => {
client.lastPing = Date.now();
});

Supported Topics​

TopicDescriptionUpdate Frequency
camerasTraffic camera images and status30 seconds
weatherWeather observations30 seconds
airQualityAQI sensor readings30 seconds
accidentsReal-time accident alerts10 seconds
patternsTraffic congestion patterns60 seconds
congestionCongestion zone updates30 seconds
allSubscribe to all topics-

Heartbeat Mechanism​

stateDiagram-v2
[*] --> Connected: Client connects
Connected --> Alive: Ping sent
Alive --> Alive: Pong received (< 30s)
Alive --> Stale: No pong (> 30s)
Stale --> Terminated: Connection closed
Terminated --> [*]

Implementation​

private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
const now = Date.now();
const staleClients: WebSocket[] = [];

this.clients.forEach((client, ws) => {
// Check for stale connections
if (now - client.lastPing > this.HEARTBEAT_TIMEOUT) {
staleClients.push(ws);
ws.terminate();
return;
}

// Send ping to active clients
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
});

// Remove stale clients
staleClients.forEach(ws => this.clients.delete(ws));
}, this.HEARTBEAT_INTERVAL);
}

Snapshot Provider​

The DataAggregator registers a snapshot provider that returns current state:

// In DataAggregator constructor
this.wsService.setSnapshotProvider(() => this.getCurrentSnapshot());

// Snapshot structure
getCurrentSnapshot(): Snapshot {
return {
cameras: Array.from(this.cameraCache.values()).map(e => e.data),
weather: Array.from(this.weatherCache.values()).map(e => e.data),
airQuality: Array.from(this.airQualityCache.values()).map(e => e.data),
accidents: Array.from(this.accidentCache.values()).map(e => e.data),
patterns: Array.from(this.patternCache.values()).map(e => e.data)
};
}

Error Handling​

// Handle client errors
ws.on('error', (error) => {
logger.error('WebSocket error:', error);
this.clients.delete(ws);
});

// Handle client disconnect
ws.on('close', () => {
logger.info('Client disconnected');
this.clients.delete(ws);
});

// Handle message parsing errors
ws.on('message', (message: string) => {
try {
const data = JSON.parse(message.toString());
this.handleClientMessage(ws, data);
} catch (error) {
logger.error('Invalid message format:', error);
}
});

Frontend Integration​

// React hook example
const useWebSocket = () => {
const [data, setData] = useState<Snapshot | null>(null);
const wsRef = useRef<WebSocket | null>(null);

useEffect(() => {
const ws = new WebSocket('ws://localhost:5000');
wsRef.current = ws;

ws.onmessage = (event) => {
const message = JSON.parse(event.data);

switch (message.type) {
case 'SNAPSHOT':
setData(message.data);
break;
case 'UPDATE':
setData(prev => ({
...prev,
[message.topic]: message.data
}));
break;
case 'ALERT':
// Handle alert
break;
}
};

return () => ws.close();
}, []);

return data;
};

Performance Metrics​

MetricValue
Max concurrent clients1000+
Heartbeat interval10 seconds
Stale timeout30 seconds
Average latency< 50ms
Message throughput1000+ msg/s

References​