help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
rust-web-server
/
src
RSK World
rust-web-server
Rust Web Server - High-Performance Async Web Server + WebSocket Support + JWT Authentication + File Upload + Memory Safety + Educational Design
src
  • auth.rs15.7 KB
  • config.rs2.9 KB
  • error.rs5.2 KB
  • file_upload.rs19 KB
  • handlers.rs12.8 KB
  • lib.rs1.8 KB
  • main.rs6 KB
  • middleware.rs6.2 KB
  • static_files.rs9.9 KB
  • utils.rs9.6 KB
  • websocket.rs15.3 KB
websocket.rs
src/websocket.rs
Raw Download
Find: Go to:
/*
 * WebSocket Module - Rust Web Server
 * 
 * Created by RSK World (https://rskworld.in)
 * Founder: Molla Samser
 * Designer & Tester: Rima Khatun
 * 
 * Contact:
 * - Email: hello@rskworld.in, support@rskworld.in
 * - Phone: +91 93305 39277
 * - Address: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
 * 
 * © 2026 RSK World. All rights reserved.
 * Content used for educational purposes only.
 */

use futures_util::{SinkExt, StreamExt};
use hyper::Body;
use hyper::Response;
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use crate::error::{ServerError, ServerResult};

/// WebSocket message types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WebSocketMessage {
    /// Chat message
    Chat {
        id: String,
        user: String,
        message: String,
        timestamp: String,
    },
    /// System notification
    System {
        message: String,
        level: String, // info, warning, error
        timestamp: String,
    },
    /// User joined/left
    UserEvent {
        user: String,
        action: String, // joined, left
        timestamp: String,
    },
    /// Real-time statistics
    Stats {
        active_connections: u64,
        total_messages: u64,
        uptime: u64,
        timestamp: String,
    },
    /// Ping/Pong for connection health
    Ping { timestamp: String },
    Pong { timestamp: String },
    /// Error message
    Error {
        code: String,
        message: String,
        timestamp: String,
    },
}

/// WebSocket connection manager
pub struct WebSocketManager {
    /// Connected clients
    clients: Arc<RwLock<HashMap<String, broadcast::Sender<WebSocketMessage>>>>,
    /// Message broadcaster
    broadcaster: broadcast::Sender<WebSocketMessage>,
    /// Statistics
    stats: Arc<RwLock<WebSocketStats>>,
    /// Server start time for uptime tracking
    start_time: std::time::Instant,
}

/// WebSocket statistics
#[derive(Debug, Default)]
pub struct WebSocketStats {
    pub active_connections: u64,
    pub total_messages: u64,
    pub total_connections: u64,
    pub messages_per_second: f64,
}

impl WebSocketManager {
    /// Create a new WebSocket manager
    pub fn new() -> Self {
        let (broadcaster, _) = broadcast::channel(1000);
        
        Self {
            clients: Arc::new(RwLock::new(HashMap::new())),
            broadcaster,
            stats: Arc::new(RwLock::new(WebSocketStats::default())),
            start_time: std::time::Instant::now(),
        }
    }

    /// Handle a new WebSocket connection
    pub async fn handle_connection(
        &self,
        ws_stream: WebSocketStream<hyper::upgrade::Upgraded>,
        client_id: String,
    ) -> ServerResult<()> {
        let (mut ws_sender, mut ws_receiver) = ws_stream.split();
        
        // Create a new receiver for this client
        let mut receiver = self.broadcaster.subscribe();
        
        // Add client to the manager
        {
            let mut clients = self.clients.write().await;
            clients.insert(client_id.clone(), self.broadcaster.clone());
            
            // Update stats
            let mut stats = self.stats.write().await;
            stats.active_connections += 1;
            stats.total_connections += 1;
        }
        
        info!("WebSocket client connected: {}", client_id);
        
        // Send welcome message
        let welcome_msg = WebSocketMessage::System {
            message: format!("Welcome to the WebSocket server! Your ID: {}", client_id),
            level: "info".to_string(),
            timestamp: chrono::Utc::now().to_rfc3339(),
        };
        
        if let Err(e) = self.broadcaster.send(welcome_msg) {
            warn!("Failed to send welcome message: {}", e);
        }
        
        // Spawn tasks for sending and receiving messages
        let client_id_send = client_id.clone();
        let broadcaster_send = self.broadcaster.clone();
        let send_task = tokio::spawn(async move {
            while let Ok(message) = receiver.recv().await {
                let json = serde_json::to_string(&message).unwrap_or_default();
                
                match ws_sender.send(Message::Text(json)).await {
                    Ok(_) => debug!("Sent message to client {}", client_id_send),
                    Err(e) => {
                        error!("Failed to send message to client {}: {}", client_id_send, e);
                        break;
                    }
                }
            }
        });
        
        let client_id_recv = client_id.clone();
        let broadcaster_recv = self.broadcaster.clone();
        let stats_recv = self.stats.clone();
        let recv_task = tokio::spawn(async move {
            while let Some(msg) = ws_receiver.next().await {
                match msg {
                    Ok(Message::Text(text)) => {
                        if let Ok(ws_message) = serde_json::from_str::<WebSocketMessage>(&text) {
                            match ws_message {
                                WebSocketMessage::Chat { user, message, .. } => {
                                    let chat_msg = WebSocketMessage::Chat {
                                        id: Uuid::new_v4().to_string(),
                                        user,
                                        message,
                                        timestamp: chrono::Utc::now().to_rfc3339(),
                                    };
                                    
                                    if let Err(e) = broadcaster_recv.send(chat_msg) {
                                        warn!("Failed to broadcast chat message: {}", e);
                                    }
                                    
                                    // Update message stats
                                    let mut stats = stats_recv.write().await;
                                    stats.total_messages += 1;
                                }
                                WebSocketMessage::Ping { timestamp } => {
                                    let pong_msg = WebSocketMessage::Pong { timestamp };
                                    if let Err(e) = broadcaster_recv.send(pong_msg) {
                                        warn!("Failed to send pong: {}", e);
                                    }
                                }
                                _ => {
                                    debug!("Received other message type from client {}", client_id_recv);
                                }
                            }
                        } else {
                            warn!("Invalid JSON message from client {}", client_id_recv);
                        }
                    }
                    Ok(Message::Close(_)) => {
                        info!("WebSocket client {} sent close message", client_id_recv);
                        break;
                    }
                    Ok(Message::Ping(_)) => {
                        // Respond with pong automatically handled by tungstenite
                        debug!("Received ping from client {}", client_id_recv);
                    }
                    Ok(Message::Pong(_)) => {
                        debug!("Received pong from client {}", client_id_recv);
                    }
                    Err(e) => {
                        error!("WebSocket error for client {}: {}", client_id_recv, e);
                        break;
                    }
                    _ => {}
                }
            }
        });
        
        // Wait for either task to complete
        tokio::select! {
            _ = send_task => {
                debug!("Send task completed for client {}", client_id);
            }
            _ = recv_task => {
                debug!("Receive task completed for client {}", client_id);
            }
        }
        
        // Clean up client connection
        {
            let mut clients = self.clients.write().await;
            clients.remove(&client_id);
            
            // Update stats
            let mut stats = self.stats.write().await;
            stats.active_connections = stats.active_connections.saturating_sub(1);
        }
        
        info!("WebSocket client disconnected: {}", client_id);
        
        // Send user left notification
        let leave_msg = WebSocketMessage::UserEvent {
            user: client_id,
            action: "left".to_string(),
            timestamp: chrono::Utc::now().to_rfc3339(),
        };
        
        if let Err(e) = self.broadcaster.send(leave_msg) {
            warn!("Failed to send user left message: {}", e);
        }
        
        Ok(())
    }

    /// Broadcast a message to all connected clients
    pub async fn broadcast(&self, message: WebSocketMessage) -> ServerResult<()> {
        if let Err(e) = self.broadcaster.send(message) {
            return Err(ServerError::Internal(format!("Failed to broadcast message: {}", e)));
        }
        Ok(())
    }

    /// Get current statistics
    pub async fn get_stats(&self) -> WebSocketStats {
        self.stats.read().await.clone()
    }

    /// Send periodic statistics updates
    pub async fn start_stats_updates(&self) {
        let manager = self.clone();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
            
            loop {
                interval.tick().await;
                
                let stats = manager.get_stats().await;
                let uptime = manager.get_uptime().await;
                
                let stats_msg = WebSocketMessage::Stats {
                    active_connections: stats.active_connections,
                    total_messages: stats.total_messages,
                    uptime,
                    timestamp: chrono::Utc::now().to_rfc3339(),
                };
                
                if let Err(e) = manager.broadcaster.send(stats_msg) {
                    warn!("Failed to send stats update: {}", e);
                }
            }
        });
    }

    /// Get server uptime in seconds
    async fn get_uptime(&self) -> u64 {
        self.start_time.elapsed().as_secs()
    }
}

impl Clone for WebSocketManager {
    fn clone(&self) -> Self {
        Self {
            clients: self.clients.clone(),
            broadcaster: self.broadcaster.clone(),
            stats: self.stats.clone(),
            start_time: self.start_time,
        }
    }
}

/// WebSocket upgrade handler
pub async fn websocket_upgrade_handler(
    req: Request<Body>,
    manager: Arc<WebSocketManager>,
) -> ServerResult<Response<Body>> {
    use hyper::header::{CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, UPGRADE};
    use hyper::upgrade::Upgradable;
    use sha1::{Digest, Sha1};
    use std::collections::HashMap;
    
    // Check if this is a WebSocket upgrade request
    let headers = req.headers();
    
    // Verify required headers
    let upgrade_header = headers
        .get(UPGRADE)
        .and_then(|h| h.to_str().ok())
        .ok_or_else(|| ServerError::BadRequest("Missing Upgrade header".to_string()))?;
    
    let connection_header = headers
        .get(CONNECTION)
        .and_then(|h| h.to_str().ok())
        .ok_or_else(|| ServerError::BadRequest("Missing Connection header".to_string()))?;
    
    let ws_key = headers
        .get(SEC_WEBSOCKET_KEY)
        .and_then(|h| h.to_str().ok())
        .ok_or_else(|| ServerError::BadRequest("Missing Sec-WebSocket-Key header".to_string()))?;
    
    // Verify upgrade request
    if upgrade_header.to_lowercase() != "websocket" {
        return Err(ServerError::BadRequest("Invalid Upgrade header".to_string()));
    }
    
    if !connection_header.to_lowercase().contains("upgrade") {
        return Err(ServerError::BadRequest("Invalid Connection header".to_string()));
    }
    
    // Generate WebSocket accept key
    let ws_accept = generate_websocket_accept_key(ws_key);
    
    // Generate client ID
    let client_id = Uuid::new_v4().to_string();
    
    info!("WebSocket upgrade requested for client: {}", client_id);
    
    // Create upgrade response
    let response = Response::builder()
        .status(StatusCode::SWITCHING_PROTOCOLS)
        .header(UPGRADE, "websocket")
        .header(CONNECTION, "upgrade")
        .header(SEC_WEBSOCKET_ACCEPT, ws_accept)
        .body(Body::empty())
        .map_err(|e| ServerError::Internal(format!("Failed to create upgrade response: {}", e)))?;
    
    // Spawn WebSocket connection handling
    let manager_clone = manager.clone();
    let client_id_clone = client_id.clone();
    
    tokio::spawn(async move {
        match hyper::upgrade::on(req).await {
            Ok(upgraded) => {
                let ws_stream = WebSocketStream::from_raw_socket(
                    upgraded,
                    tokio_tungstenite::tungstenite::protocol::Role::Server,
                    None,
                ).await;
                
                if let Err(e) = manager_clone.handle_connection(ws_stream, client_id_clone).await {
                    error!("WebSocket connection error: {}", e);
                }
            }
            Err(e) => {
                error!("Failed to upgrade connection: {}", e);
            }
        }
    });
    
    Ok(response)
}

/// Generate WebSocket accept key from client key
fn generate_websocket_accept_key(client_key: &str) -> String {
    use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
    use sha1::{Digest, Sha1};
    
    let ws_magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
    let mut hasher = Sha1::new();
    hasher.update(client_key.as_bytes());
    hasher.update(ws_magic_string.as_bytes());
    let result = hasher.finalize();
    
    BASE64.encode(result)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_websocket_message_serialization() {
        let message = WebSocketMessage::Chat {
            id: "123".to_string(),
            user: "test_user".to_string(),
            message: "Hello, World!".to_string(),
            timestamp: "2023-01-01T00:00:00Z".to_string(),
        };

        let json = serde_json::to_string(&message).unwrap();
        let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();

        match deserialized {
            WebSocketMessage::Chat { user, message, .. } => {
                assert_eq!(user, "test_user");
                assert_eq!(message, "Hello, World!");
            }
            _ => panic!("Expected Chat message"),
        }
    }

    #[tokio::test]
    async fn test_websocket_manager_creation() {
        let manager = WebSocketManager::new();
        let stats = manager.get_stats().await;
        
        assert_eq!(stats.active_connections, 0);
        assert_eq!(stats.total_messages, 0);
        assert_eq!(stats.total_connections, 0);
    }
}
440 lines•15.3 KB
rust

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer