WebSocket Implementation
Overview
Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.
When to Use
- Building real-time chat and messaging
- Implementing live notifications
- Creating collaborative editing tools
- Broadcasting live data updates
- Building real-time dashboards
- Streaming events to clients
- Live multiplayer games
Instructions
1. Node.js WebSocket Server (Socket.IO)
javascript
1const express = require('express');
2const http = require('http');
3const socketIo = require('socket.io');
4const redis = require('redis');
5
6const app = express();
7const server = http.createServer(app);
8const io = socketIo(server, {
9 cors: { origin: '*' },
10 transports: ['websocket', 'polling'],
11 reconnection: true,
12 reconnectionDelay: 1000,
13 reconnectionDelayMax: 5000,
14 reconnectionAttempts: 5
15});
16
17// Redis adapter for horizontal scaling
18const redisClient = redis.createClient();
19const { createAdapter } = require('@socket.io/redis-adapter');
20
21io.adapter(createAdapter(redisClient, redisClient.duplicate()));
22
23// Connection management
24const connectedUsers = new Map();
25
26io.on('connection', (socket) => {
27 console.log(`User connected: ${socket.id}`);
28
29 // Store user connection
30 socket.on('auth', (userData) => {
31 connectedUsers.set(socket.id, {
32 userId: userData.id,
33 username: userData.username,
34 socketId: socket.id,
35 connectedAt: new Date()
36 });
37
38 // Join user-specific room
39 socket.join(`user:${userData.id}`);
40 socket.join('authenticated_users');
41
42 // Notify others user is online
43 io.to('authenticated_users').emit('user:online', {
44 userId: userData.id,
45 username: userData.username,
46 timestamp: new Date()
47 });
48
49 console.log(`User authenticated: ${userData.username}`);
50 });
51
52 // Chat messaging
53 socket.on('chat:message', (message) => {
54 const user = connectedUsers.get(socket.id);
55
56 if (!user) {
57 socket.emit('error', { message: 'Not authenticated' });
58 return;
59 }
60
61 const chatMessage = {
62 id: `msg_${Date.now()}`,
63 senderId: user.userId,
64 senderName: user.username,
65 text: message.text,
66 roomId: message.roomId,
67 timestamp: new Date(),
68 status: 'delivered'
69 };
70
71 // Save to database
72 Message.create(chatMessage);
73
74 // Broadcast to room
75 io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);
76
77 // Update message status
78 setTimeout(() => {
79 socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
80 }, 100);
81 });
82
83 // Room management
84 socket.on('room:join', (roomId) => {
85 socket.join(`room:${roomId}`);
86
87 const user = connectedUsers.get(socket.id);
88 io.to(`room:${roomId}`).emit('room:user:joined', {
89 userId: user.userId,
90 username: user.username,
91 timestamp: new Date()
92 });
93 });
94
95 socket.on('room:leave', (roomId) => {
96 socket.leave(`room:${roomId}`);
97
98 const user = connectedUsers.get(socket.id);
99 io.to(`room:${roomId}`).emit('room:user:left', {
100 userId: user.userId,
101 timestamp: new Date()
102 });
103 });
104
105 // Typing indicator
106 socket.on('typing:start', (roomId) => {
107 const user = connectedUsers.get(socket.id);
108 io.to(`room:${roomId}`).emit('typing:indicator', {
109 userId: user.userId,
110 username: user.username,
111 isTyping: true
112 });
113 });
114
115 socket.on('typing:stop', (roomId) => {
116 const user = connectedUsers.get(socket.id);
117 io.to(`room:${roomId}`).emit('typing:indicator', {
118 userId: user.userId,
119 isTyping: false
120 });
121 });
122
123 // Handle disconnection
124 socket.on('disconnect', () => {
125 const user = connectedUsers.get(socket.id);
126
127 if (user) {
128 connectedUsers.delete(socket.id);
129 io.to('authenticated_users').emit('user:offline', {
130 userId: user.userId,
131 timestamp: new Date()
132 });
133
134 console.log(`User disconnected: ${user.username}`);
135 }
136 });
137
138 // Error handling
139 socket.on('error', (error) => {
140 console.error(`Socket error: ${error}`);
141 socket.emit('error', { message: 'An error occurred' });
142 });
143});
144
145// Server methods
146const broadcastUserUpdate = (userId, data) => {
147 io.to(`user:${userId}`).emit('user:update', data);
148};
149
150const notifyRoom = (roomId, event, data) => {
151 io.to(`room:${roomId}`).emit(event, data);
152};
153
154const sendDirectMessage = (userId, event, data) => {
155 io.to(`user:${userId}`).emit(event, data);
156};
157
158server.listen(3000, () => {
159 console.log('WebSocket server listening on port 3000');
160});
2. Browser WebSocket Client
javascript
1class WebSocketClient {
2 constructor(url, options = {}) {
3 this.url = url;
4 this.socket = null;
5 this.reconnectAttempts = 0;
6 this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
7 this.reconnectDelay = options.reconnectDelay || 1000;
8 this.listeners = new Map();
9 this.messageQueue = [];
10 this.isAuthenticated = false;
11
12 this.connect();
13 }
14
15 connect() {
16 this.socket = io(this.url, {
17 reconnection: true,
18 reconnectionDelay: this.reconnectDelay,
19 reconnectionAttempts: this.maxReconnectAttempts
20 });
21
22 this.socket.on('connect', () => {
23 console.log('Connected to server');
24 this.reconnectAttempts = 0;
25 this.processMessageQueue();
26 });
27
28 this.socket.on('disconnect', () => {
29 console.log('Disconnected from server');
30 });
31
32 this.socket.on('error', (error) => {
33 console.error('Socket error:', error);
34 this.emit('error', error);
35 });
36
37 this.socket.on('connect_error', (error) => {
38 console.error('Connection error:', error);
39 });
40 }
41
42 authenticate(userData) {
43 this.socket.emit('auth', userData, (response) => {
44 if (response.success) {
45 this.isAuthenticated = true;
46 this.emit('authenticated');
47 }
48 });
49 }
50
51 on(event, callback) {
52 this.socket.on(event, callback);
53
54 if (!this.listeners.has(event)) {
55 this.listeners.set(event, []);
56 }
57 this.listeners.get(event).push(callback);
58 }
59
60 emit(event, data, callback) {
61 if (!this.socket.connected) {
62 this.messageQueue.push({ event, data, callback });
63 return;
64 }
65
66 this.socket.emit(event, data, callback);
67 }
68
69 processMessageQueue() {
70 while (this.messageQueue.length > 0) {
71 const { event, data, callback } = this.messageQueue.shift();
72 this.socket.emit(event, data, callback);
73 }
74 }
75
76 joinRoom(roomId) {
77 this.emit('room:join', roomId);
78 }
79
80 leaveRoom(roomId) {
81 this.emit('room:leave', roomId);
82 }
83
84 sendMessage(roomId, text) {
85 this.emit('chat:message', { roomId, text });
86 }
87
88 setTypingIndicator(roomId, isTyping) {
89 if (isTyping) {
90 this.emit('typing:start', roomId);
91 } else {
92 this.emit('typing:stop', roomId);
93 }
94 }
95
96 disconnect() {
97 this.socket.disconnect();
98 }
99}
100
101// Usage
102const client = new WebSocketClient('http://localhost:3000');
103
104client.on('chat:message', (message) => {
105 console.log('Received message:', message);
106 displayMessage(message);
107});
108
109client.on('typing:indicator', (data) => {
110 updateTypingIndicator(data);
111});
112
113client.on('user:online', (user) => {
114 updateUserStatus(user.userId, 'online');
115});
116
117client.authenticate({ id: 'user123', username: 'john' });
118client.joinRoom('room1');
119client.sendMessage('room1', 'Hello everyone!');
3. Python WebSocket Server (aiohttp)
python
1from aiohttp import web
2import aiohttp
3import json
4from datetime import datetime
5from typing import Set
6
7class WebSocketServer:
8 def __init__(self):
9 self.app = web.Application()
10 self.rooms = {}
11 self.users = {}
12 self.setup_routes()
13
14 def setup_routes(self):
15 self.app.router.add_get('/ws', self.websocket_handler)
16 self.app.router.add_post('/api/message', self.send_message_api)
17
18 async def websocket_handler(self, request):
19 ws = web.WebSocketResponse()
20 await ws.prepare(request)
21
22 user_id = None
23 room_id = None
24
25 async for msg in ws.iter_any():
26 if isinstance(msg, aiohttp.WSMessage):
27 data = json.loads(msg.data)
28 event_type = data.get('type')
29
30 try:
31 if event_type == 'auth':
32 user_id = data.get('userId')
33 self.users[user_id] = ws
34 await ws.send_json({
35 'type': 'authenticated',
36 'timestamp': datetime.now().isoformat()
37 })
38
39 elif event_type == 'join_room':
40 room_id = data.get('roomId')
41 if room_id not in self.rooms:
42 self.rooms[room_id] = set()
43 self.rooms[room_id].add(user_id)
44
45 # Notify others
46 await self.broadcast_to_room(room_id, {
47 'type': 'user_joined',
48 'userId': user_id,
49 'timestamp': datetime.now().isoformat()
50 }, exclude=user_id)
51
52 elif event_type == 'message':
53 message = {
54 'id': f'msg_{datetime.now().timestamp()}',
55 'userId': user_id,
56 'text': data.get('text'),
57 'roomId': room_id,
58 'timestamp': datetime.now().isoformat()
59 }
60
61 # Save to database
62 await self.save_message(message)
63
64 # Broadcast to room
65 await self.broadcast_to_room(room_id, message)
66
67 elif event_type == 'leave_room':
68 if room_id in self.rooms:
69 self.rooms[room_id].discard(user_id)
70
71 except Exception as error:
72 await ws.send_json({
73 'type': 'error',
74 'message': str(error)
75 })
76
77 # Cleanup on disconnect
78 if user_id:
79 del self.users[user_id]
80 if room_id and user_id:
81 if room_id in self.rooms:
82 self.rooms[room_id].discard(user_id)
83
84 return ws
85
86 async def broadcast_to_room(self, room_id, message, exclude=None):
87 if room_id not in self.rooms:
88 return
89
90 for user_id in self.rooms[room_id]:
91 if user_id != exclude and user_id in self.users:
92 try:
93 await self.users[user_id].send_json(message)
94 except Exception as error:
95 print(f'Error sending message: {error}')
96
97 async def save_message(self, message):
98 # Save to database
99 pass
100
101 async def send_message_api(self, request):
102 data = await request.json()
103 room_id = data.get('roomId')
104
105 await self.broadcast_to_room(room_id, {
106 'type': 'message',
107 'text': data.get('text'),
108 'timestamp': datetime.now().isoformat()
109 })
110
111 return web.json_response({'sent': True})
112
113def create_app():
114 server = WebSocketServer()
115 return server.app
116
117if __name__ == '__main__':
118 app = create_app()
119 web.run_app(app, port=3000)
4. Message Types and Protocols
json
1// Authentication
2{
3 "type": "auth",
4 "userId": "user123",
5 "token": "jwt_token_here"
6}
7
8// Chat Message
9{
10 "type": "message",
11 "roomId": "room123",
12 "text": "Hello everyone!",
13 "timestamp": "2025-01-15T10:30:00Z"
14}
15
16// Typing Indicator
17{
18 "type": "typing",
19 "roomId": "room123",
20 "isTyping": true
21}
22
23// Presence
24{
25 "type": "presence",
26 "status": "online|away|offline"
27}
28
29// Notification
30{
31 "type": "notification",
32 "title": "New message",
33 "body": "You have a new message",
34 "data": {}
35}
5. Scaling with Redis
javascript
1const redis = require('redis');
2const { createAdapter } = require('@socket.io/redis-adapter');
3const { createClient } = require('redis');
4
5const pubClient = createClient({ host: 'redis', port: 6379 });
6const subClient = pubClient.duplicate();
7
8io.adapter(createAdapter(pubClient, subClient));
9
10// Publish to multiple servers
11io.emit('user:action', { userId: 123, action: 'login' });
12
13// Subscribe to events from other servers
14redisClient.subscribe('notifications', (message) => {
15 const notification = JSON.parse(message);
16 io.to(`user:${notification.userId}`).emit('notification', notification);
17});
Best Practices
✅ DO
- Implement proper authentication
- Handle reconnection gracefully
- Manage rooms/channels effectively
- Persist messages appropriately
- Monitor active connections
- Implement presence features
- Use Redis for scaling
- Add message acknowledgment
- Implement rate limiting
- Handle errors properly
❌ DON'T
- Send unencrypted sensitive data
- Keep unlimited message history in memory
- Allow arbitrary room/channel creation
- Forget to clean up disconnected connections
- Send large messages frequently
- Ignore network failures
- Store passwords in messages
- Skip authentication/authorization
- Create unbounded growth of connections
- Ignore scalability from day one
Monitoring
javascript
1// Track active connections
2io.engine.on('connection_error', (err) => {
3 console.log(err.req); // the request object
4 console.log(err.code); // the error code, e.g. 1
5 console.log(err.message); // the error message
6 console.log(err.context); // some additional error context
7});
8
9app.get('/metrics/websocket', (req, res) => {
10 res.json({
11 activeConnections: io.engine.clientsCount,
12 connectedSockets: io.sockets.sockets.size,
13 rooms: Object.keys(io.sockets.adapter.rooms)
14 });
15});