help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
weather-chatbot
/
utils
RSK World
weather-chatbot
Weather Chatbot - Python + Flask + OpenWeatherMap + OpenAI + Weather Forecast + Weather Alerts + Natural Language Processing
utils
  • __pycache__
  • __init__.py3.9 KB
  • advanced_nlp.py28.4 KB
  • analytics.py22 KB
  • auth.py17.8 KB
  • comparison.py24.2 KB
  • database.py24.6 KB
  • geolocation.py20.1 KB
  • multilang.py22 KB
  • notifications.py34.1 KB
  • rate_limiting.py24.3 KB
  • weather_maps.py29.6 KB
  • weather_utils.py12.9 KB
auth.py
utils/auth.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Weather Chatbot Authentication Module
====================================

Author: RSK World (https://rskworld.in)
Founded by: Molla Samser
Designer & Tester: Rima Khatun
Contact: +91 93305 39277, hello@rskworld.in, support@rskworld.in
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026

Description: User authentication and session management
"""

import jwt
import secrets
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify, session, g
from werkzeug.security import generate_password_hash, check_password_hash
import re
from typing import Dict, Optional, Callable

class AuthManager:
    """Authentication and authorization manager"""
    
    def __init__(self, app=None, secret_key: str = None, db_manager=None):
        self.app = app
        self.secret_key = secret_key or secrets.token_urlsafe(32)
        self.db_manager = db_manager
        self.token_expiry = timedelta(hours=24)
        
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """Initialize with Flask app"""
        app.config.setdefault('JWT_SECRET_KEY', self.secret_key)
        app.config.setdefault('JWT_ACCESS_TOKEN_EXPIRES', self.token_expiry)
        
        @app.before_request
        def load_user():
            """Load user from session/token before each request"""
            user_id = session.get('user_id')
            if user_id:
                g.current_user = self.get_user_by_id(user_id)
            else:
                g.current_user = None
    
    def generate_token(self, user_data: Dict) -> str:
        """Generate JWT access token"""
        payload = {
            'user_id': user_data['user_id'],
            'username': user_data['username'],
            'email': user_data['email'],
            'exp': datetime.utcnow() + self.token_expiry,
            'iat': datetime.utcnow()
        }
        
        return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
    def verify_token(self, token: str) -> Optional[Dict]:
        """Verify JWT token and return payload"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def authenticate_user(self, username: str, password: str) -> Dict:
        """Authenticate user credentials"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate input
        if not username or not password:
            return {'success': False, 'error': 'Username and password required'}
        
        # Authenticate against database
        auth_result = self.db_manager.authenticate_user(username, password)
        
        if auth_result['success']:
            # Generate token
            token = self.generate_token(auth_result)
            auth_result['token'] = token
            
            # Create session
            session['user_id'] = auth_result['user_id']
            session['username'] = auth_result['username']
            session.permanent = True
        
        return auth_result
    
    def register_user(self, username: str, email: str, password: str) -> Dict:
        """Register new user account"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate inputs
        validation_errors = self.validate_registration_data(username, email, password)
        if validation_errors:
            return {'success': False, 'error': validation_errors}
        
        # Create user in database
        create_result = self.db_manager.create_user(username, email, password)
        
        return create_result
    
    def validate_registration_data(self, username: str, email: str, password: str) -> str:
        """Validate user registration data"""
        errors = []
        
        # Username validation
        if not username:
            errors.append('Username is required')
        elif len(username) < 3:
            errors.append('Username must be at least 3 characters long')
        elif len(username) > 30:
            errors.append('Username must be less than 30 characters')
        elif not re.match(r'^[a-zA-Z0-9_]+$', username):
            errors.append('Username can only contain letters, numbers, and underscores')
        
        # Email validation
        if not email:
            errors.append('Email is required')
        elif not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
            errors.append('Invalid email format')
        
        # Password validation
        if not password:
            errors.append('Password is required')
        elif len(password) < 8:
            errors.append('Password must be at least 8 characters long')
        elif len(password) > 128:
            errors.append('Password must be less than 128 characters')
        elif not re.search(r'[A-Z]', password):
            errors.append('Password must contain at least one uppercase letter')
        elif not re.search(r'[a-z]', password):
            errors.append('Password must contain at least one lowercase letter')
        elif not re.search(r'\d', password):
            errors.append('Password must contain at least one digit')
        elif not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append('Password must contain at least one special character')
        
        return '; '.join(errors) if errors else ''
    
    def logout_user(self) -> Dict:
        """Logout user and clear session"""
        session.clear()
        g.current_user = None
        return {'success': True, 'message': 'Logged out successfully'}
    
    def get_current_user(self) -> Optional[Dict]:
        """Get currently authenticated user"""
        return getattr(g, 'current_user', None)
    
    def get_user_by_id(self, user_id: int) -> Optional[Dict]:
        """Get user by ID from database"""
        if not self.db_manager:
            return None
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT id, username, email, api_key, created_at, last_login, is_active
                    FROM users WHERE id = ?
                ''', (user_id,))
                
                user_data = cursor.fetchone()
                if user_data:
                    columns = [desc[0] for desc in cursor.description]
                    return dict(zip(columns, user_data))
                return None
        except:
            return None
    
    def change_password(self, user_id: int, old_password: str, new_password: str) -> Dict:
        """Change user password"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate new password
        validation_error = self.validate_password(new_password)
        if validation_error:
            return {'success': False, 'error': validation_error}
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Get current password hash
                cursor.execute('SELECT password_hash FROM users WHERE id = ?', (user_id,))
                result = cursor.fetchone()
                
                if not result:
                    return {'success': False, 'error': 'User not found'}
                
                current_hash = result[0]
                
                # Verify old password
                if not check_password_hash(current_hash, old_password):
                    return {'success': False, 'error': 'Current password is incorrect'}
                
                # Update password
                new_hash = generate_password_hash(new_password)
                cursor.execute('''
                    UPDATE users SET password_hash = ? WHERE id = ?
                ''', (new_hash, user_id))
                
                conn.commit()
                
                return {'success': True, 'message': 'Password changed successfully'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def validate_password(self, password: str) -> str:
        """Validate password strength"""
        if len(password) < 8:
            return 'Password must be at least 8 characters long'
        elif len(password) > 128:
            return 'Password must be less than 128 characters'
        elif not re.search(r'[A-Z]', password):
            return 'Password must contain at least one uppercase letter'
        elif not re.search(r'[a-z]', password):
            return 'Password must contain at least one lowercase letter'
        elif not re.search(r'\d', password):
            return 'Password must contain at least one digit'
        elif not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            return 'Password must contain at least one special character'
        
        return ''
    
    def reset_password_request(self, email: str) -> Dict:
        """Request password reset"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Generate reset token
        reset_token = secrets.token_urlsafe(32)
        expiry = datetime.utcnow() + timedelta(hours=1)
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Check if email exists
                cursor.execute('SELECT id, username FROM users WHERE email = ?', (email,))
                user = cursor.fetchone()
                
                if not user:
                    return {'success': False, 'error': 'Email not found'}
                
                user_id, username = user
                
                # Store reset token (you might want a separate table for this)
                cursor.execute('''
                    UPDATE users SET 
                    api_key = ? || '_reset_' || ?,
                    last_login = ?
                    WHERE id = ?
                ''', (reset_token, expiry.isoformat(), datetime.utcnow(), user_id))
                
                conn.commit()
                
                # In a real application, you would send an email here
                return {
                    'success': True,
                    'message': 'Password reset token generated',
                    'reset_token': reset_token,  # Only for development
                    'expires_at': expiry.isoformat()
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def reset_password(self, token: str, new_password: str) -> Dict:
        """Reset password with token"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate new password
        validation_error = self.validate_password(new_password)
        if validation_error:
            return {'success': False, 'error': validation_error}
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Find user with reset token
                cursor.execute('''
                    SELECT id, api_key FROM users 
                    WHERE api_key LIKE ? || '_reset_%'
                ''', (token,))
                
                result = cursor.fetchone()
                
                if not result:
                    return {'success': False, 'error': 'Invalid or expired reset token'}
                
                user_id = result[0]
                
                # Update password
                new_hash = generate_password_hash(new_password)
                cursor.execute('''
                    UPDATE users SET password_hash = ?, api_key = ? WHERE id = ?
                ''', (new_hash, secrets.token_urlsafe(32), user_id))
                
                conn.commit()
                
                return {'success': True, 'message': 'Password reset successful'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}

# Decorators for authentication
def require_auth(f: Callable) -> Callable:
    """Decorator to require authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not hasattr(g, 'current_user') or not g.current_user:
            return jsonify({'error': 'Authentication required'}), 401
        return f(*args, **kwargs)
    return decorated_function

def require_api_key(f: Callable) -> Callable:
    """Decorator to require API key"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key')
        if not api_key:
            api_key = request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key against database
        # This would need access to the database manager
        # For now, just check if it's provided
        return f(*args, **kwargs)
    return decorated_function

def rate_limit(limit: int = 100, per: int = 3600):
    """Rate limiting decorator"""
    def decorator(f: Callable) -> Callable:
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Implement rate limiting logic here
            # This would typically use Redis or a database
            return f(*args, **kwargs)
        return decorated_function
    return decorator

class SessionManager:
    """Session management utilities"""
    
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """Initialize session configuration"""
        app.config.setdefault('SESSION_PERMANENT', True)
        app.config.setdefault('SESSION_USE_SIGNER', True)
        app.config.setdefault('SESSION_KEY_PREFIX', 'weather_chatbot:')
        app.config.setdefault('PERMANENT_SESSION_LIFETIME', timedelta(days=7))
    
    def create_session(self, user_data: Dict):
        """Create user session"""
        session.clear()
        session['user_id'] = user_data['user_id']
        session['username'] = user_data['username']
        session['email'] = user_data['email']
        session['login_time'] = datetime.utcnow().isoformat()
        session.permanent = True
    
    def update_session_activity(self):
        """Update last activity timestamp"""
        session['last_activity'] = datetime.utcnow().isoformat()
    
    def is_session_valid(self) -> bool:
        """Check if session is valid"""
        if not session.get('user_id'):
            return False
        
        # Check session age
        login_time = session.get('login_time')
        if login_time:
            try:
                login_dt = datetime.fromisoformat(login_time)
                if datetime.utcnow() - login_dt > timedelta(days=7):
                    return False
            except:
                return False
        
        return True
    
    def get_session_info(self) -> Dict:
        """Get session information"""
        return {
            'user_id': session.get('user_id'),
            'username': session.get('username'),
            'email': session.get('email'),
            'login_time': session.get('login_time'),
            'last_activity': session.get('last_activity'),
            'is_valid': self.is_session_valid()
        }
    
    def destroy_session(self):
        """Destroy current session"""
        session.clear()

# Security utilities
class SecurityUtils:
    """Security utility functions"""
    
    @staticmethod
    def generate_csrf_token() -> str:
        """Generate CSRF token"""
        return secrets.token_urlsafe(32)
    
    @staticmethod
    def validate_csrf_token(token: str, session_token: str) -> bool:
        """Validate CSRF token"""
        return secrets.compare_digest(token, session_token)
    
    @staticmethod
    def sanitize_input(input_string: str) -> str:
        """Sanitize user input"""
        if not input_string:
            return ''
        
        # Remove potentially dangerous characters
        dangerous_chars = ['<', '>', '"', "'", '&', '\x00', '\n', '\r', '\t']
        sanitized = input_string
        
        for char in dangerous_chars:
            sanitized = sanitized.replace(char, '')
        
        return sanitized.strip()
    
    @staticmethod
    def is_safe_url(url: str) -> bool:
        """Check if URL is safe for redirects"""
        if not url:
            return False
        
        # Allow relative URLs
        if url.startswith('/') and not url.startswith('//'):
            return True
        
        # Allow same origin URLs
        from flask import request
        host_url = request.host_url.rstrip('/')
        if url.startswith(host_url):
            return True
        
        return False
    
    @staticmethod
    def hash_data(data: str) -> str:
        """Hash data using SHA-256"""
        import hashlib
        return hashlib.sha256(data.encode()).hexdigest()
    
    @staticmethod
    def verify_data_hash(data: str, hash_value: str) -> bool:
        """Verify data against hash"""
        return SecurityUtils.hash_data(data) == hash_value
476 lines•17.8 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer