help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • Blog
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
multi-language-chatbot
/
modules
RSK World
multi-language-chatbot
Multi-language Chatbot - Python + Flask + OpenAI API + NLP + Translation + Language Detection + Cultural Adaptation
modules
  • __pycache__
  • __init__.py194 B
  • analytics_engine.py28.6 KB
  • chatbot_core.py10.8 KB
  • collaboration_manager.py22.3 KB
  • conversation_memory.py25.2 KB
  • cultural_adapter.py12.3 KB
  • document_analyzer.py21.5 KB
  • language_detector.py5.8 KB
  • multimodal_processor.py32.7 KB
  • personality_engine.py33.6 KB
  • sentiment_analyzer.py16.9 KB
  • translator.py7.5 KB
  • voice_processor.py13.2 KB
collaboration_manager.py
modules/collaboration_manager.py
Raw Download
Find: Go to:
"""
Collaboration Manager Module
Author: RSK World (https://rskworld.in)
Founder: Molla Samser
Designer & Tester: Rima Khatun
Contact: help@rskworld.in, +91 93305 39277
Year: 2026
Description: Real-time collaboration features with WebSocket support and room management
"""

import json
import logging
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Set
from collections import defaultdict, deque
import threading
import time
from dataclasses import dataclass, asdict
import hashlib

logger = logging.getLogger(__name__)

@dataclass
class User:
    """User representation for collaboration"""
    id: str
    name: str
    email: str
    avatar: str = None
    role: str = 'participant'
    language: str = 'en'
    joined_at: datetime = None
    last_active: datetime = None
    is_online: bool = True
    permissions: List[str] = None
    
    def __post_init__(self):
        if self.joined_at is None:
            self.joined_at = datetime.now()
        if self.last_active is None:
            self.last_active = datetime.now()
        if self.permissions is None:
            self.permissions = ['read', 'write']

@dataclass
class Room:
    """Collaboration room representation"""
    id: str
    name: str
    description: str = ''
    created_by: str = None
    created_at: datetime = None
    is_public: bool = True
    max_participants: int = 50
    language: str = 'en'
    settings: Dict = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.settings is None:
            self.settings = {
                'allow_file_sharing': True,
                'allow_voice_chat': True,
                'auto_translate': True,
                'save_history': True
            }

@dataclass
class Message:
    """Message representation for collaboration"""
    id: str
    room_id: str
    user_id: str
    content: str
    message_type: str = 'text'  # text, file, voice, system
    language: str = 'en'
    timestamp: datetime = None
    edited: bool = False
    edited_at: datetime = None
    reply_to: str = None
    reactions: Dict = None
    metadata: Dict = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()
        if self.reactions is None:
            self.reactions = {}
        if self.metadata is None:
            self.metadata = {}

class CollaborationManager:
    def __init__(self):
        self.rooms: Dict[str, Room] = {}
        self.users: Dict[str, User] = {}
        self.room_users: Dict[str, Set[str]] = defaultdict(set)
        self.room_messages: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
        self.user_sessions: Dict[str, Dict] = {}
        self.active_connections: Dict[str, Any] = {}
        
        # Collaboration settings
        self.max_message_length = 4000
        self.message_rate_limit = 30  # messages per minute
        self.session_timeout = 300  # 5 minutes
        self.max_rooms_per_user = 10
        
        # Background tasks
        self.cleanup_thread = threading.Thread(target=self._cleanup_inactive_users, daemon=True)
        self.cleanup_thread.start()
        
        logger.info("Collaboration manager initialized")
    
    def create_room(self, name: str, creator_id: str, description: str = '', 
                   is_public: bool = True, settings: Dict = None) -> Dict:
        """Create a new collaboration room"""
        try:
            # Validate user
            if creator_id not in self.users:
                return {'error': 'User not found'}
            
            creator = self.users[creator_id]
            
            # Check room limit
            user_rooms = self._get_user_rooms(creator_id)
            if len(user_rooms) >= self.max_rooms_per_user:
                return {'error': f'Maximum rooms limit ({self.max_rooms_per_user}) reached'}
            
            # Create room
            room_id = str(uuid.uuid4())
            room = Room(
                id=room_id,
                name=name,
                description=description,
                created_by=creator_id,
                is_public=is_public,
                settings=settings or {}
            )
            
            self.rooms[room_id] = room
            
            # Add creator as admin
            self.join_room(room_id, creator_id)
            self._set_user_role(room_id, creator_id, 'admin')
            
            logger.info(f"Room created: {name} by {creator.name}")
            
            return {
                'room_id': room_id,
                'room': asdict(room),
                'message': 'Room created successfully'
            }
            
        except Exception as e:
            logger.error(f"Error creating room: {str(e)}")
            return {'error': 'Failed to create room'}
    
    def join_room(self, room_id: str, user_id: str) -> Dict:
        """Join a collaboration room"""
        try:
            if room_id not in self.rooms:
                return {'error': 'Room not found'}
            
            if user_id not in self.users:
                return {'error': 'User not found'}
            
            room = self.rooms[room_id]
            user = self.users[user_id]
            
            # Check room capacity
            current_participants = len(self.room_users[room_id])
            if current_participants >= room.max_participants:
                return {'error': 'Room is full'}
            
            # Add user to room
            self.room_users[room_id].add(user_id)
            user.last_active = datetime.now()
            
            # Send system message
            system_message = Message(
                id=str(uuid.uuid4()),
                room_id=room_id,
                user_id='system',
                content=f"{user.name} joined the room",
                message_type='system',
                language=room.language
            )
            
            self._add_message(system_message)
            
            logger.info(f"User {user.name} joined room {room.name}")
            
            return {
                'success': True,
                'room': asdict(room),
                'user_count': len(self.room_users[room_id])
            }
            
        except Exception as e:
            logger.error(f"Error joining room: {str(e)}")
            return {'error': 'Failed to join room'}
    
    def leave_room(self, room_id: str, user_id: str) -> Dict:
        """Leave a collaboration room"""
        try:
            if room_id not in self.rooms:
                return {'error': 'Room not found'}
            
            if user_id not in self.users:
                return {'error': 'User not found'}
            
            room = self.rooms[room_id]
            user = self.users[user_id]
            
            # Remove user from room
            self.room_users[room_id].discard(user_id)
            
            # Send system message
            system_message = Message(
                id=str(uuid.uuid4()),
                room_id=room_id,
                user_id='system',
                content=f"{user.name} left the room",
                message_type='system',
                language=room.language
            )
            
            self._add_message(system_message)
            
            logger.info(f"User {user.name} left room {room.name}")
            
            return {
                'success': True,
                'message': 'Left room successfully'
            }
            
        except Exception as e:
            logger.error(f"Error leaving room: {str(e)}")
            return {'error': 'Failed to leave room'}
    
    def send_message(self, room_id: str, user_id: str, content: str, 
                   message_type: str = 'text', reply_to: str = None,
                   metadata: Dict = None) -> Dict:
        """Send a message to a room"""
        try:
            # Validate
            if room_id not in self.rooms:
                return {'error': 'Room not found'}
            
            if user_id not in self.users:
                return {'error': 'User not found'}
            
            if user_id not in self.room_users[room_id]:
                return {'error': 'User not in room'}
            
            # Rate limiting
            if not self._check_rate_limit(user_id):
                return {'error': 'Rate limit exceeded'}
            
            # Content validation
            if len(content) > self.max_message_length:
                return {'error': f'Message too long (max {self.max_message_length} characters)'}
            
            room = self.rooms[room_id]
            user = self.users[user_id]
            
            # Create message
            message = Message(
                id=str(uuid.uuid4()),
                room_id=room_id,
                user_id=user_id,
                content=content,
                message_type=message_type,
                language=user.language,
                reply_to=reply_to,
                metadata=metadata or {}
            )
            
            # Add message
            self._add_message(message)
            
            # Update user activity
            user.last_active = datetime.now()
            
            # Broadcast to room
            self._broadcast_message(room_id, message)
            
            logger.debug(f"Message sent in room {room.name} by {user.name}")
            
            return {
                'success': True,
                'message_id': message.id,
                'timestamp': message.timestamp.isoformat()
            }
            
        except Exception as e:
            logger.error(f"Error sending message: {str(e)}")
            return {'error': 'Failed to send message'}
    
    def get_room_messages(self, room_id: str, limit: int = 50, 
                         before: str = None) -> List[Dict]:
        """Get messages from a room"""
        try:
            if room_id not in self.rooms:
                return []
            
            messages = list(self.room_messages[room_id])
            
            # Filter by timestamp if specified
            if before:
                before_time = datetime.fromisoformat(before)
                messages = [m for m in messages if m.timestamp < before_time]
            
            # Get latest messages
            messages = messages[-limit:] if limit > 0 else messages
            
            # Convert to dict
            return [asdict(msg) for msg in messages]
            
        except Exception as e:
            logger.error(f"Error getting room messages: {str(e)}")
            return []
    
    def get_room_users(self, room_id: str) -> List[Dict]:
        """Get users in a room"""
        try:
            if room_id not in self.rooms:
                return []
            
            users = []
            for user_id in self.room_users[room_id]:
                if user_id in self.users:
                    user = self.users[user_id]
                    users.append(asdict(user))
            
            return users
            
        except Exception as e:
            logger.error(f"Error getting room users: {str(e)}")
            return []
    
    def get_public_rooms(self, limit: int = 20) -> List[Dict]:
        """Get list of public rooms"""
        try:
            rooms = []
            for room in self.rooms.values():
                if room.is_public:
                    room_data = asdict(room)
                    room_data['user_count'] = len(self.room_users[room.id])
                    rooms.append(room_data)
            
            # Sort by creation time (newest first)
            rooms.sort(key=lambda x: x['created_at'], reverse=True)
            
            return rooms[:limit] if limit > 0 else rooms
            
        except Exception as e:
            logger.error(f"Error getting public rooms: {str(e)}")
            return []
    
    def register_user(self, user_id: str, name: str, email: str, 
                     language: str = 'en', avatar: str = None) -> Dict:
        """Register a new user"""
        try:
            if user_id in self.users:
                return {'error': 'User already exists'}
            
            user = User(
                id=user_id,
                name=name,
                email=email,
                language=language,
                avatar=avatar
            )
            
            self.users[user_id] = user
            
            logger.info(f"User registered: {name}")
            
            return {
                'success': True,
                'user': asdict(user)
            }
            
        except Exception as e:
            logger.error(f"Error registering user: {str(e)}")
            return {'error': 'Failed to register user'}
    
    def update_user_status(self, user_id: str, is_online: bool = True) -> Dict:
        """Update user online status"""
        try:
            if user_id not in self.users:
                return {'error': 'User not found'}
            
            user = self.users[user_id]
            user.is_online = is_online
            user.last_active = datetime.now()
            
            # Broadcast status change to rooms
            for room_id in self.room_users:
                if user_id in self.room_users[room_id]:
                    self._broadcast_user_status(room_id, user_id, is_online)
            
            return {'success': True}
            
        except Exception as e:
            logger.error(f"Error updating user status: {str(e)}")
            return {'error': 'Failed to update status'}
    
    def add_reaction(self, room_id: str, message_id: str, user_id: str, 
                    emoji: str) -> Dict:
        """Add reaction to message"""
        try:
            if room_id not in self.rooms:
                return {'error': 'Room not found'}
            
            if user_id not in self.room_users[room_id]:
                return {'error': 'User not in room'}
            
            # Find message
            message = None
            for msg in self.room_messages[room_id]:
                if msg.id == message_id:
                    message = msg
                    break
            
            if not message:
                return {'error': 'Message not found'}
            
            # Add reaction
            if emoji not in message.reactions:
                message.reactions[emoji] = []
            
            if user_id not in message.reactions[emoji]:
                message.reactions[emoji].append(user_id)
                
                # Broadcast reaction update
                self._broadcast_reaction(room_id, message_id, emoji, user_id, 'add')
            
            return {'success': True}
            
        except Exception as e:
            logger.error(f"Error adding reaction: {str(e)}")
            return {'error': 'Failed to add reaction'}
    
    def get_room_statistics(self, room_id: str) -> Dict:
        """Get room statistics"""
        try:
            if room_id not in self.rooms:
                return {'error': 'Room not found'}
            
            room = self.rooms[room_id]
            messages = list(self.room_messages[room_id])
            
            # Calculate statistics
            total_messages = len(messages)
            user_messages = defaultdict(int)
            message_types = defaultdict(int)
            
            for message in messages:
                if message.user_id != 'system':
                    user_messages[message.user_id] += 1
                    message_types[message.message_type] += 1
            
            most_active_user = max(user_messages.items(), key=lambda x: x[1]) if user_messages else None
            
            return {
                'room_id': room_id,
                'total_messages': total_messages,
                'unique_users': len(user_messages),
                'most_active_user': most_active_user[0] if most_active_user else None,
                'most_active_count': most_active_user[1] if most_active_user else 0,
                'message_types': dict(message_types),
                'created_at': room.created_at.isoformat()
            }
            
        except Exception as e:
            logger.error(f"Error getting room statistics: {str(e)}")
            return {'error': 'Failed to get statistics'}
    
    def _add_message(self, message: Message):
        """Add message to room"""
        self.room_messages[message.room_id].append(message)
    
    def _check_rate_limit(self, user_id: str) -> bool:
        """Check if user is rate limited"""
        now = datetime.now()
        one_minute_ago = now - timedelta(minutes=1)
        
        # Count messages in last minute
        count = 0
        for room_id in self.room_messages:
            for message in self.room_messages[room_id]:
                if (message.user_id == user_id and 
                    message.timestamp > one_minute_ago):
                    count += 1
        
        return count < self.message_rate_limit
    
    def _set_user_role(self, room_id: str, user_id: str, role: str):
        """Set user role in room"""
        if user_id in self.users:
            self.users[user_id].role = role
    
    def _get_user_rooms(self, user_id: str) -> List[str]:
        """Get rooms user is in"""
        return [room_id for room_id, users in self.room_users.items() 
                if user_id in users]
    
    def _broadcast_message(self, room_id: str, message: Message):
        """Broadcast message to room users"""
        # This would integrate with WebSocket server
        message_data = {
            'type': 'message',
            'room_id': room_id,
            'message': asdict(message)
        }
        
        # Send to active connections
        for user_id in self.room_users[room_id]:
            if user_id in self.active_connections:
                try:
                    # Send to WebSocket connection
                    pass  # Implementation depends on WebSocket library
                except Exception as e:
                    logger.error(f"Error broadcasting to user {user_id}: {str(e)}")
    
    def _broadcast_user_status(self, room_id: str, user_id: str, is_online: bool):
        """Broadcast user status change"""
        status_data = {
            'type': 'user_status',
            'room_id': room_id,
            'user_id': user_id,
            'is_online': is_online
        }
        
        # Broadcast to room users
        for uid in self.room_users[room_id]:
            if uid in self.active_connections:
                try:
                    # Send to WebSocket connection
                    pass
                except Exception as e:
                    logger.error(f"Error broadcasting status to user {uid}: {str(e)}")
    
    def _broadcast_reaction(self, room_id: str, message_id: str, 
                          emoji: str, user_id: str, action: str):
        """Broadcast reaction update"""
        reaction_data = {
            'type': 'reaction',
            'room_id': room_id,
            'message_id': message_id,
            'emoji': emoji,
            'user_id': user_id,
            'action': action
        }
        
        # Broadcast to room users
        for uid in self.room_users[room_id]:
            if uid in self.active_connections:
                try:
                    # Send to WebSocket connection
                    pass
                except Exception as e:
                    logger.error(f"Error broadcasting reaction to user {uid}: {str(e)}")
    
    def _cleanup_inactive_users(self):
        """Background task to cleanup inactive users"""
        while True:
            try:
                now = datetime.now()
                timeout = timedelta(seconds=self.session_timeout)
                
                inactive_users = []
                for user_id, user in self.users.items():
                    if user.last_active and (now - user.last_active) > timeout:
                        inactive_users.append(user_id)
                
                # Mark inactive users as offline
                for user_id in inactive_users:
                    self.update_user_status(user_id, False)
                
                # Sleep for 1 minute
                time.sleep(60)
                
            except Exception as e:
                logger.error(f"Error in cleanup task: {str(e)}")
                time.sleep(60)
    
    def get_user_rooms(self, user_id: str) -> List[Dict]:
        """Get rooms user is in"""
        try:
            if user_id not in self.users:
                return []
            
            user_rooms = []
            for room_id in self.room_users:
                if user_id in self.room_users[room_id] and room_id in self.rooms:
                    room = self.rooms[room_id]
                    room_data = asdict(room)
                    room_data['user_count'] = len(self.room_users[room_id])
                    user_rooms.append(room_data)
            
            return user_rooms
            
        except Exception as e:
            logger.error(f"Error getting user rooms: {str(e)}")
            return []
    
    def search_messages(self, room_id: str, query: str, user_id: str = None) -> List[Dict]:
        """Search messages in room"""
        try:
            if room_id not in self.rooms:
                return []
            
            results = []
            query_lower = query.lower()
            
            for message in self.room_messages[room_id]:
                if user_id and message.user_id != user_id:
                    continue
                
                if query_lower in message.content.lower():
                    results.append(asdict(message))
            
            return results
            
        except Exception as e:
            logger.error(f"Error searching messages: {str(e)}")
            return []
633 lines•22.3 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer