help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
multi-language-chatbot
/
modules
RSK World
multi-language-chatbot
Multi-language Chatbot - Python + Flask + OpenAI API + NLP + Translation + Language Detection + Cultural Adaptation
modules
  • __pycache__
  • __init__.py194 B
  • analytics_engine.py28.6 KB
  • chatbot_core.py10.8 KB
  • collaboration_manager.py22.3 KB
  • conversation_memory.py25.2 KB
  • cultural_adapter.py12.3 KB
  • document_analyzer.py21.5 KB
  • language_detector.py5.8 KB
  • multimodal_processor.py32.7 KB
  • personality_engine.py33.6 KB
  • sentiment_analyzer.py16.9 KB
  • translator.py7.5 KB
  • voice_processor.py13.2 KB
multimodal_processor.py
modules/multimodal_processor.py
Raw Download
Find: Go to:
"""
Multi-modal Input Processor Module
Author: RSK World (https://rskworld.in)
Founder: Molla Samser
Designer & Tester: Rima Khatun
Contact: help@rskworld.in, +91 93305 39277
Year: 2026
Description: Advanced multi-modal input processing supporting text, voice, images, and video
"""

import os
import logging
import tempfile
import base64
import json
import hashlib
from typing import Dict, List, Optional, Any, Tuple, Union
from datetime import datetime
try:
    import torch
except ImportError:
    torch = None
try:
    import cv2
except ImportError:
    cv2 = None
try:
    from PIL import Image, ImageDraw, ImageFont
except ImportError:
    Image = None
try:
    import speech_recognition as sr
except ImportError:
    sr = None
try:
    import whisper
except ImportError:
    whisper = None
try:
    from transformers import BlipProcessor, BlipForConditionalGeneration
except ImportError:
    BlipProcessor = None
    BlipForConditionalGeneration = None
try:
    from transformers import pipeline
except ImportError:
    pipeline = None
import io
import mimetypes

logger = logging.getLogger(__name__)

class MultiModalProcessor:
    def __init__(self):
        self.supported_formats = {
            'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'],
            'audio': ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'],
            'video': ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv'],
            'document': ['.pdf', '.doc', '.docx', '.txt', '.md']
        }
        
        # Initialize models
        self.whisper_model = None
        self.image_caption_model = None
        self.image_processor = None
        self.ocr_model = None
        
        # Processing settings
        self.max_file_size = 100 * 1024 * 1024  # 100MB
        self.temp_dir = tempfile.mkdtemp(prefix='multimodal_')
        
        # Initialize models
        self._initialize_models()
    
    def _initialize_models(self):
        """Initialize AI models for multi-modal processing"""
        try:
            # Initialize Whisper for speech recognition
            if torch and torch.cuda.is_available():
                self.whisper_model = whisper.load_model("base")
            elif whisper:
                self.whisper_model = whisper.load_model("tiny")
            
            try:
                # Initialize BLIP for image captioning
                if BlipProcessor and BlipForConditionalGeneration:
                    self.image_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
                    self.image_caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
            except Exception as e:
                logger.warning(f"Failed to load image captioning model: {str(e)}")
            
            # Initialize OCR pipeline
            try:
                if pipeline:
                    self.ocr_model = pipeline("image-to-text", model="microsoft/trocr-base-printed")
            except Exception as e:
                logger.warning(f"Failed to load OCR model: {str(e)}")
            
            logger.info("Multi-modal models initialized successfully")
            
        except Exception as e:
            logger.error(f"Error initializing models: {str(e)}")
    
    def process_input(self, input_data: Dict, user_id: str = None) -> Dict:
        """
        Process multi-modal input
        input_data format:
        {
            'type': 'text' | 'image' | 'audio' | 'video' | 'mixed',
            'content': str | bytes,
            'metadata': Dict
        }
        """
        try:
            input_type = input_data.get('type', 'text')
            content = input_data.get('content')
            metadata = input_data.get('metadata', {})
            
            result = {
                'input_type': input_type,
                'processed_content': {},
                'extracted_info': {},
                'timestamp': datetime.now().isoformat(),
                'user_id': user_id
            }
            
            if input_type == 'text':
                result['processed_content'] = self._process_text(content, metadata)
            elif input_type == 'image':
                result['processed_content'] = self._process_image(content, metadata)
            elif input_type == 'audio':
                result['processed_content'] = self._process_audio(content, metadata)
            elif input_type == 'video':
                result['processed_content'] = self._process_video(content, metadata)
            elif input_type == 'mixed':
                result['processed_content'] = self._process_mixed_input(content, metadata)
            else:
                result['error'] = f'Unsupported input type: {input_type}'
            
            # Extract common information
            result['extracted_info'] = self._extract_common_info(result['processed_content'])
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing multi-modal input: {str(e)}")
            return {'error': f'Processing failed: {str(e)}'}
    
    def _process_text(self, content: str, metadata: Dict) -> Dict:
        """Process text input"""
        try:
            result = {
                'type': 'text',
                'content': content,
                'language': self._detect_language(content),
                'sentiment': self._analyze_sentiment(content),
                'entities': self._extract_entities(content),
                'keywords': self._extract_keywords(content),
                'length': len(content),
                'word_count': len(content.split())
            }
            
            # Add metadata
            result.update(metadata)
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing text: {str(e)}")
            return {'error': f'Text processing failed: {str(e)}'}
    
    def _process_image(self, content: Union[str, bytes], metadata: Dict) -> Dict:
        """Process image input"""
        try:
            # Convert to PIL Image
            if isinstance(content, str):
                if content.startswith('data:image'):
                    # Base64 image
                    image_data = base64.b64decode(content.split(',')[1])
                    image = Image.open(io.BytesIO(image_data))
                else:
                    # File path
                    image = Image.open(content)
            else:
                # Raw bytes
                image = Image.open(io.BytesIO(content))
            
            result = {
                'type': 'image',
                'format': image.format,
                'size': image.size,
                'mode': image.mode,
                'caption': self._generate_image_caption(image),
                'ocr_text': self._extract_text_from_image(image),
                'objects': self._detect_objects(image),
                'faces': self._detect_faces(image),
                'dominant_colors': self._extract_dominant_colors(image)
            }
            
            # Add metadata
            result.update(metadata)
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing image: {str(e)}")
            return {'error': f'Image processing failed: {str(e)}'}
    
    def _process_audio(self, content: Union[str, bytes], metadata: Dict) -> Dict:
        """Process audio input"""
        try:
            # Save audio to temporary file
            audio_path = self._save_temp_file(content, '.wav')
            
            result = {
                'type': 'audio',
                'transcription': self._transcribe_audio(audio_path),
                'duration': self._get_audio_duration(audio_path),
                'format': metadata.get('format', 'wav'),
                'sample_rate': metadata.get('sample_rate', 44100),
                'channels': metadata.get('channels', 1)
            }
            
            # Analyze transcription
            if result['transcription']:
                text_info = self._process_text(result['transcription'], {})
                result['text_analysis'] = text_info
            
            # Clean up
            os.unlink(audio_path)
            
            # Add metadata
            result.update(metadata)
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing audio: {str(e)}")
            return {'error': f'Audio processing failed: {str(e)}'}
    
    def _process_video(self, content: Union[str, bytes], metadata: Dict) -> Dict:
        """Process video input"""
        try:
            # Save video to temporary file
            video_path = self._save_temp_file(content, '.mp4')
            
            result = {
                'type': 'video',
                'frames': self._extract_video_frames(video_path),
                'duration': self._get_video_duration(video_path),
                'fps': self._get_video_fps(video_path),
                'resolution': self._get_video_resolution(video_path),
                'audio_transcription': self._extract_audio_from_video(video_path)
            }
            
            # Analyze frames
            if result['frames']:
                frame_analysis = self._analyze_video_frames(result['frames'])
                result['frame_analysis'] = frame_analysis
            
            # Clean up
            os.unlink(video_path)
            
            # Add metadata
            result.update(metadata)
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing video: {str(e)}")
            return {'error': f'Video processing failed: {str(e)}'}
    
    def _process_mixed_input(self, content: Dict, metadata: Dict) -> Dict:
        """Process mixed multi-modal input"""
        try:
            result = {
                'type': 'mixed',
                'components': {}
            }
            
            # Process each component
            for component_type, component_data in content.items():
                if component_type in ['text', 'image', 'audio', 'video']:
                    component_result = self.process_input({
                        'type': component_type,
                        'content': component_data,
                        'metadata': metadata.get(component_type, {})
                    }, metadata.get('user_id'))
                    
                    result['components'][component_type] = component_result
            
            # Generate combined analysis
            result['combined_analysis'] = self._generate_combined_analysis(result['components'])
            
            return result
            
        except Exception as e:
            logger.error(f"Error processing mixed input: {str(e)}")
            return {'error': f'Mixed input processing failed: {str(e)}'}
    
    def _generate_image_caption(self, image) -> str:
        """Generate caption for image"""
        try:
            if not self.image_processor or not self.image_caption_model or not Image:
                return "Image captioning not available"
            
            # Process image
            inputs = self.image_processor(image, return_tensors="pt")
            
            # Generate caption
            with torch.no_grad():
                outputs = self.image_caption_model.generate(**inputs, max_length=50)
                caption = self.image_processor.decode(outputs[0], skip_special_tokens=True)
            
            return caption
            
        except Exception as e:
            logger.error(f"Error generating image caption: {str(e)}")
            return "Unable to generate caption"
    
    def _extract_text_from_image(self, image: Image.Image) -> str:
        """Extract text from image using OCR"""
        try:
            if self.ocr_model:
                # Use transformers OCR
                text = self.ocr_model(image)[0]['generated_text']
                return text.strip()
            else:
                # Fallback to pytesseract
                import pytesseract
                text = pytesseract.image_to_string(image)
                return text.strip()
                
        except Exception as e:
            logger.error(f"Error extracting text from image: {str(e)}")
            return ""
    
    def _detect_objects(self, image: Image.Image) -> List[Dict]:
        """Detect objects in image"""
        try:
            # Convert to OpenCV format
            cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
            
            # Use simple object detection (placeholder for YOLO/other models)
            objects = []
            
            # Basic shape detection
            gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
            contours, _ = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            for i, contour in enumerate(contours[:10]):  # Limit to 10 objects
                area = cv2.contourArea(contour)
                if area > 1000:  # Filter small objects
                    x, y, w, h = cv2.boundingRect(contour)
                    objects.append({
                        'id': i,
                        'type': 'object',
                        'confidence': min(1.0, area / 10000),
                        'bbox': [x, y, w, h],
                        'area': area
                    })
            
            return objects
            
        except Exception as e:
            logger.error(f"Error detecting objects: {str(e)}")
            return []
    
    def _detect_faces(self, image: Image.Image) -> List[Dict]:
        """Detect faces in image"""
        try:
            # Convert to OpenCV format
            cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
            
            # Load face cascade
            face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
            
            # Detect faces
            gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, 1.1, 4)
            
            face_data = []
            for i, (x, y, w, h) in enumerate(faces):
                face_data.append({
                    'id': i,
                    'bbox': [x, y, w, h],
                    'confidence': 0.8,  # Placeholder confidence
                    'size': w * h
                })
            
            return face_data
            
        except Exception as e:
            logger.error(f"Error detecting faces: {str(e)}")
            return []
    
    def _extract_dominant_colors(self, image: Image.Image) -> List[Dict]:
        """Extract dominant colors from image"""
        try:
            # Convert to RGB if necessary
            if image.mode != 'RGB':
                image = image.convert('RGB')
            
            # Resize for faster processing
            small_image = image.resize((100, 100))
            
            # Get colors
            colors = small_image.getcolors(maxcolors=256)
            
            # Sort by frequency
            colors.sort(key=lambda x: x[0], reverse=True)
            
            dominant_colors = []
            for count, color in colors[:5]:  # Top 5 colors
                dominant_colors.append({
                    'color': color,
                    'count': count,
                    'percentage': (count / (100 * 100)) * 100,
                    'hex': '#{:02x}{:02x}{:02x}'.format(*color)
                })
            
            return dominant_colors
            
        except Exception as e:
            logger.error(f"Error extracting dominant colors: {str(e)}")
            return []
    
    def _transcribe_audio(self, audio_path: str) -> str:
        """Transcribe audio using Whisper"""
        try:
            if self.whisper_model:
                result = self.whisper_model.transcribe(audio_path)
                return result['text'].strip()
            else:
                # Fallback to speech_recognition
                recognizer = sr.Recognizer()
                with sr.AudioFile(audio_path) as source:
                    audio = recognizer.record(source)
                    return recognizer.recognize_google(audio)
                    
        except Exception as e:
            logger.error(f"Error transcribing audio: {str(e)}")
            return ""
    
    def _get_audio_duration(self, audio_path: str) -> float:
        """Get audio duration"""
        try:
            import librosa
            duration = librosa.get_duration(filename=audio_path)
            return duration
        except:
            return 0.0
    
    def _extract_video_frames(self, video_path: str, max_frames: int = 10) -> List[Dict]:
        """Extract frames from video"""
        try:
            cap = cv2.VideoCapture(video_path)
            frames = []
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            # Extract frames at regular intervals
            interval = max(1, frame_count // max_frames)
            
            for i in range(0, frame_count, interval):
                cap.set(cv2.CAP_PROP_POS_FRAMES, i)
                ret, frame = cap.read()
                
                if ret:
                    # Convert frame to PIL Image
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frame_image = Image.fromarray(frame_rgb)
                    
                    frames.append({
                        'frame_number': i,
                        'timestamp': i / cap.get(cv2.CAP_PROP_FPS),
                        'image': frame_rgb,
                        'analysis': self._process_image(frame_image, {})
                    })
            
            cap.release()
            return frames
            
        except Exception as e:
            logger.error(f"Error extracting video frames: {str(e)}")
            return []
    
    def _get_video_duration(self, video_path: str) -> float:
        """Get video duration"""
        try:
            cap = cv2.VideoCapture(video_path)
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            cap.release()
            return frame_count / fps if fps > 0 else 0.0
        except:
            return 0.0
    
    def _get_video_fps(self, video_path: str) -> float:
        """Get video FPS"""
        try:
            cap = cv2.VideoCapture(video_path)
            fps = cap.get(cv2.CAP_PROP_FPS)
            cap.release()
            return fps
        except:
            return 0.0
    
    def _get_video_resolution(self, video_path: str) -> Tuple[int, int]:
        """Get video resolution"""
        try:
            cap = cv2.VideoCapture(video_path)
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            cap.release()
            return (width, height)
        except:
            return (0, 0)
    
    def _extract_audio_from_video(self, video_path: str) -> str:
        """Extract and transcribe audio from video"""
        try:
            # Extract audio using ffmpeg (placeholder)
            audio_path = os.path.join(self.temp_dir, 'extracted_audio.wav')
            
            # This would require ffmpeg installation
            # os.system(f'ffmpeg -i {video_path} -q:a 0 -map a {audio_path}')
            
            # For now, return placeholder
            return "Audio extraction not available"
            
        except Exception as e:
            logger.error(f"Error extracting audio from video: {str(e)}")
            return ""
    
    def _analyze_video_frames(self, frames: List[Dict]) -> Dict:
        """Analyze extracted video frames"""
        try:
            if not frames:
                return {}
            
            # Aggregate frame analysis
            all_objects = []
            all_faces = []
            all_colors = []
            
            for frame in frames:
                if 'analysis' in frame:
                    analysis = frame['analysis']
                    all_objects.extend(analysis.get('objects', []))
                    all_faces.extend(analysis.get('faces', []))
                    all_colors.extend(analysis.get('dominant_colors', []))
            
            # Generate summary
            return {
                'total_frames': len(frames),
                'objects_detected': len(all_objects),
                'faces_detected': len(all_faces),
                'dominant_colors': self._aggregate_colors(all_colors),
                'activity_level': self._calculate_activity_level(frames)
            }
            
        except Exception as e:
            logger.error(f"Error analyzing video frames: {str(e)}")
            return {}
    
    def _aggregate_colors(self, colors: List[Dict]) -> List[Dict]:
        """Aggregate dominant colors from frames"""
        try:
            color_counts = {}
            
            for color_info in colors:
                color = color_info.get('hex', '')
                count = color_info.get('count', 0)
                
                if color in color_counts:
                    color_counts[color] += count
                else:
                    color_counts[color] = count
            
            # Sort by frequency
            sorted_colors = sorted(color_counts.items(), key=lambda x: x[1], reverse=True)
            
            return [{'color': color, 'count': count} for color, count in sorted_colors[:5]]
            
        except Exception as e:
            logger.error(f"Error aggregating colors: {str(e)}")
            return []
    
    def _calculate_activity_level(self, frames: List[Dict]) -> str:
        """Calculate activity level in video"""
        try:
            if len(frames) < 2:
                return 'unknown'
            
            # Simple activity detection based on frame differences
            activity_scores = []
            
            for i in range(1, len(frames)):
                prev_frame = frames[i-1]['image']
                curr_frame = frames[i]['image']
                
                # Calculate frame difference
                diff = cv2.absdiff(prev_frame, curr_frame)
                activity_score = np.mean(diff)
                activity_scores.append(activity_score)
            
            avg_activity = np.mean(activity_scores)
            
            if avg_activity < 10:
                return 'low'
            elif avg_activity < 30:
                return 'medium'
            else:
                return 'high'
                
        except Exception as e:
            logger.error(f"Error calculating activity level: {str(e)}")
            return 'unknown'
    
    def _generate_combined_analysis(self, components: Dict) -> Dict:
        """Generate combined analysis for mixed input"""
        try:
            analysis = {
                'summary': '',
                'key_insights': [],
                'recommendations': []
            }
            
            # Generate summary based on components
            if 'text' in components:
                text_comp = components['text']
                analysis['summary'] += f"Text: {text_comp.get('content', '')[:100]}... "
                
                if text_comp.get('sentiment'):
                    analysis['key_insights'].append(f"Sentiment: {text_comp['sentiment']}")
            
            if 'image' in components:
                img_comp = components['image']
                analysis['summary'] += f"Image detected with caption: {img_comp.get('caption', 'No caption')}. "
                
                if img_comp.get('objects'):
                    analysis['key_insights'].append(f"Objects detected: {len(img_comp['objects'])}")
            
            if 'audio' in components:
                audio_comp = components['audio']
                if audio_comp.get('transcription'):
                    analysis['summary'] += f"Audio transcription: {audio_comp['transcription'][:100]}... "
            
            return analysis
            
        except Exception as e:
            logger.error(f"Error generating combined analysis: {str(e)}")
            return {'summary': 'Analysis failed', 'key_insights': [], 'recommendations': []}
    
    def _detect_language(self, text: str) -> str:
        """Detect language of text"""
        try:
            from langdetect import detect
            return detect(text)
        except:
            return 'en'
    
    def _analyze_sentiment(self, text: str) -> Dict:
        """Analyze sentiment of text"""
        try:
            from textblob import TextBlob
            blob = TextBlob(text)
            polarity = blob.sentiment.polarity
            
            if polarity > 0.1:
                sentiment = 'positive'
            elif polarity < -0.1:
                sentiment = 'negative'
            else:
                sentiment = 'neutral'
            
            return {
                'sentiment': sentiment,
                'polarity': polarity,
                'confidence': abs(polarity)
            }
        except:
            return {'sentiment': 'neutral', 'polarity': 0.0, 'confidence': 0.0}
    
    def _extract_entities(self, text: str) -> List[str]:
        """Extract entities from text"""
        try:
            # Simple entity extraction (could be enhanced with spaCy/NLTK)
            import re
            
            # Extract emails
            emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
            
            # Extract phone numbers
            phones = re.findall(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text)
            
            # Extract URLs
            urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
            
            entities = []
            if emails:
                entities.extend([{'type': 'email', 'value': email} for email in emails])
            if phones:
                entities.extend([{'type': 'phone', 'value': phone} for phone in phones])
            if urls:
                entities.extend([{'type': 'url', 'value': url} for url in urls])
            
            return entities
            
        except:
            return []
    
    def _extract_keywords(self, text: str) -> List[str]:
        """Extract keywords from text"""
        try:
            # Simple keyword extraction
            from collections import Counter
            import re
            
            # Remove punctuation and convert to lowercase
            words = re.findall(r'\b\w+\b', text.lower())
            
            # Filter common words
            common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
            filtered_words = [word for word in words if len(word) > 3 and word not in common_words]
            
            # Get most common words
            word_counts = Counter(filtered_words)
            keywords = [word for word, count in word_counts.most_common(10)]
            
            return keywords
            
        except:
            return []
    
    def _extract_common_info(self, processed_content: Dict) -> Dict:
        """Extract common information from processed content"""
        try:
            common_info = {
                'input_types': [],
                'languages': [],
                'sentiments': [],
                'key_entities': [],
                'content_summary': ''
            }
            
            if isinstance(processed_content, dict):
                if processed_content.get('type'):
                    common_info['input_types'].append(processed_content['type'])
                
                if processed_content.get('language'):
                    common_info['languages'].append(processed_content['language'])
                
                if processed_content.get('sentiment'):
                    common_info['sentiments'].append(processed_content['sentiment'])
                
                if processed_content.get('entities'):
                    common_info['key_entities'].extend(processed_content['entities'])
                
                # Generate content summary
                if processed_content.get('content'):
                    content = processed_content['content']
                    if isinstance(content, str):
                        common_info['content_summary'] = content[:200] + '...' if len(content) > 200 else content
                elif processed_content.get('caption'):
                    common_info['content_summary'] = processed_content['caption']
            
            elif isinstance(processed_content, dict) and 'components' in processed_content:
                # Handle mixed input
                for component_type, component_data in processed_content['components'].items():
                    component_info = self._extract_common_info(component_data)
                    common_info['input_types'].extend(component_info['input_types'])
                    common_info['languages'].extend(component_info['languages'])
                    common_info['sentiments'].extend(component_info['sentiments'])
                    common_info['key_entities'].extend(component_info['key_entities'])
            
            return common_info
            
        except Exception as e:
            logger.error(f"Error extracting common info: {str(e)}")
            return {}
    
    def _save_temp_file(self, content: Union[str, bytes], extension: str) -> str:
        """Save content to temporary file"""
        try:
            # Generate unique filename
            filename = f"temp_{datetime.now().timestamp()}{extension}"
            filepath = os.path.join(self.temp_dir, filename)
            
            if isinstance(content, str):
                if content.startswith('data:'):
                    # Base64 data
                    header, data = content.split(',', 1)
                    file_data = base64.b64decode(data)
                else:
                    # File path
                    return content
            else:
                # Raw bytes
                file_data = content
            
            with open(filepath, 'wb') as f:
                f.write(file_data)
            
            return filepath
            
        except Exception as e:
            logger.error(f"Error saving temp file: {str(e)}")
            return ""
    
    def get_supported_formats(self) -> Dict[str, List[str]]:
        """Get list of supported formats"""
        return self.supported_formats.copy()
    
    def validate_input(self, input_data: Dict) -> Dict:
        """Validate input data"""
        try:
            validation_result = {
                'is_valid': True,
                'errors': [],
                'warnings': []
            }
            
            # Check required fields
            if 'type' not in input_data:
                validation_result['errors'].append('Missing input type')
                validation_result['is_valid'] = False
            
            if 'content' not in input_data:
                validation_result['errors'].append('Missing content')
                validation_result['is_valid'] = False
            
            # Check file size if applicable
            if isinstance(input_data.get('content'), bytes):
                size = len(input_data['content'])
                if size > self.max_file_size:
                    validation_result['errors'].append(f'File too large: {size} bytes (max: {self.max_file_size})')
                    validation_result['is_valid'] = False
            
            return validation_result
            
        except Exception as e:
            logger.error(f"Error validating input: {str(e)}")
            return {'is_valid': False, 'errors': [str(e)], 'warnings': []}
    
    def cleanup(self):
        """Clean up temporary files"""
        try:
            import shutil
            if os.path.exists(self.temp_dir):
                shutil.rmtree(self.temp_dir)
                logger.info("Temporary files cleaned up")
        except Exception as e:
            logger.error(f"Error cleaning up: {str(e)}")
855 lines•32.7 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer