help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
weather-chatbot
/
utils
RSK World
weather-chatbot
Weather Chatbot - Python + Flask + OpenWeatherMap + OpenAI + Weather Forecast + Weather Alerts + Natural Language Processing
utils
  • __pycache__
  • __init__.py3.9 KB
  • advanced_nlp.py28.4 KB
  • analytics.py22 KB
  • auth.py17.8 KB
  • comparison.py24.2 KB
  • database.py24.6 KB
  • geolocation.py20.1 KB
  • multilang.py22 KB
  • notifications.py34.1 KB
  • rate_limiting.py24.3 KB
  • weather_maps.py29.6 KB
  • weather_utils.py12.9 KB
rate_limiting.py
utils/rate_limiting.py
Raw Download
Find: Go to:
"""
API Rate Limiting and Caching Module for Weather Chatbot
========================================================

This module provides comprehensive rate limiting and caching functionality
to optimize API usage, prevent abuse, and improve response times.

Features:
- Request rate limiting per user/IP
- Response caching with TTL
- API usage tracking and analytics
- Distributed rate limiting support
- Memory and Redis-based caching
- Configurable limits and policies

Author: RSK World
Website: https://rskworld.in
Email: hello@rskworld.in
Phone: +91 93305 39277
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026
"""

import time
import json
import hashlib
import threading
from typing import Dict, Any, Optional, List, Union
from datetime import datetime, timedelta
from collections import defaultdict, deque
import logging
from dataclasses import dataclass
from functools import wraps

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

try:
    import redis
    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False
    logger.warning("Redis not available. Using in-memory caching only.")

@dataclass
class RateLimitConfig:
    """Configuration for rate limiting."""
    requests_per_minute: int = 60
    requests_per_hour: int = 1000
    requests_per_day: int = 10000
    burst_limit: int = 10
    cleanup_interval: int = 300  # 5 minutes

@dataclass
class CacheConfig:
    """Configuration for caching."""
    default_ttl: int = 300  # 5 minutes
    weather_ttl: int = 600  # 10 minutes
    forecast_ttl: int = 1800  # 30 minutes
    max_cache_size: int = 10000
    cleanup_interval: int = 600  # 10 minutes

class RateLimiter:
    """
    Implements rate limiting for API requests.
    Supports multiple time windows and configurable limits.
    """
    
    def __init__(self, config: RateLimitConfig = None):
        """
        Initialize the RateLimiter.
        
        Args:
            config (RateLimitConfig): Rate limiting configuration
        """
        self.config = config or RateLimitConfig()
        self.requests = defaultdict(lambda: defaultdict(deque))
        self.lock = threading.RLock()
        self.last_cleanup = time.time()
        
        # Start cleanup thread
        self.cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
        self.cleanup_thread.start()
    
    def _cleanup_worker(self) -> None:
        """Background worker to clean up old request records."""
        while True:
            time.sleep(self.config.cleanup_interval)
            self._cleanup()
    
    def _cleanup(self) -> None:
        """Clean up old request records."""
        current_time = time.time()
        
        with self.lock:
            for identifier, windows in list(self.requests.items()):
                for window_type, request_times in list(windows.items()):
                    # Remove old requests based on window type
                    cutoff_time = current_time - self._get_window_duration(window_type)
                    
                    # Remove old requests from deque
                    while request_times and request_times[0] < cutoff_time:
                        request_times.popleft()
                    
                    # Remove empty windows
                    if not request_times:
                        del windows[window_type]
                
                # Remove empty identifiers
                if not windows:
                    del self.requests[identifier]
        
        logger.debug(f"Rate limiter cleanup completed. Active identifiers: {len(self.requests)}")
    
    def _get_window_duration(self, window_type: str) -> int:
        """Get window duration in seconds."""
        durations = {
            'minute': 60,
            'hour': 3600,
            'day': 86400,
            'burst': 1
        }
        return durations.get(window_type, 60)
    
    def _get_limit(self, window_type: str) -> int:
        """Get request limit for window type."""
        limits = {
            'minute': self.config.requests_per_minute,
            'hour': self.config.requests_per_hour,
            'day': self.config.requests_per_day,
            'burst': self.config.burst_limit
        }
        return limits.get(window_type, 60)
    
    def is_allowed(self, identifier: str, window_type: str = 'minute') -> tuple[bool, Dict[str, Any]]:
        """
        Check if a request is allowed based on rate limits.
        
        Args:
            identifier (str): Unique identifier (IP, user ID, etc.)
            window_type (str): Type of time window
            
        Returns:
            tuple[bool, Dict[str, Any]]: (is_allowed, info_dict)
        """
        current_time = time.time()
        limit = self._get_limit(window_type)
        window_duration = self._get_window_duration(window_type)
        
        with self.lock:
            request_times = self.requests[identifier][window_type]
            
            # Remove old requests
            cutoff_time = current_time - window_duration
            while request_times and request_times[0] < cutoff_time:
                request_times.popleft()
            
            # Check if limit is exceeded
            if len(request_times) >= limit:
                # Calculate reset time
                reset_time = request_times[0] + window_duration
                
                return False, {
                    'allowed': False,
                    'limit': limit,
                    'remaining': 0,
                    'reset_time': reset_time,
                    'retry_after': int(reset_time - current_time),
                    'window_type': window_type
                }
            
            # Add current request
            request_times.append(current_time)
            
            return True, {
                'allowed': True,
                'limit': limit,
                'remaining': limit - len(request_times),
                'reset_time': current_time + window_duration,
                'retry_after': 0,
                'window_type': window_type
            }
    
    def get_usage_stats(self, identifier: str) -> Dict[str, Any]:
        """
        Get usage statistics for an identifier.
        
        Args:
            identifier (str): Unique identifier
            
        Returns:
            Dict[str, Any]: Usage statistics
        """
        current_time = time.time()
        stats = {}
        
        with self.lock:
            for window_type in ['minute', 'hour', 'day', 'burst']:
                if identifier in self.requests and window_type in self.requests[identifier]:
                    request_times = self.requests[identifier][window_type]
                    limit = self._get_limit(window_type)
                    window_duration = self._get_window_duration(window_type)
                    
                    # Remove old requests
                    cutoff_time = current_time - window_duration
                    recent_requests = [t for t in request_times if t >= cutoff_time]
                    
                    stats[window_type] = {
                        'requests': len(recent_requests),
                        'limit': limit,
                        'remaining': max(0, limit - len(recent_requests)),
                        'reset_time': min(recent_requests) + window_duration if recent_requests else current_time + window_duration
                    }
                else:
                    stats[window_type] = {
                        'requests': 0,
                        'limit': self._get_limit(window_type),
                        'remaining': self._get_limit(window_type),
                        'reset_time': current_time + self._get_window_duration(window_type)
                    }
        
        return stats
    
    def reset_limits(self, identifier: str) -> None:
        """
        Reset rate limits for an identifier.
        
        Args:
            identifier (str): Unique identifier
        """
        with self.lock:
            if identifier in self.requests:
                del self.requests[identifier]
        
        logger.info(f"Rate limits reset for identifier: {identifier}")

class CacheManager:
    """
    Implements caching functionality with support for multiple backends.
    Provides TTL-based expiration and size limits.
    """
    
    def __init__(self, config: CacheConfig = None, redis_url: str = None):
        """
        Initialize the CacheManager.
        
        Args:
            config (CacheConfig): Cache configuration
            redis_url (str): Redis connection URL
        """
        self.config = config or CacheConfig()
        self.redis_client = None
        
        # Try to connect to Redis if available and URL provided
        if REDIS_AVAILABLE and redis_url:
            try:
                self.redis_client = redis.from_url(redis_url)
                self.redis_client.ping()
                logger.info("Connected to Redis for caching")
            except Exception as e:
                logger.warning(f"Failed to connect to Redis: {e}. Using in-memory cache.")
                self.redis_client = None
        
        # In-memory cache as fallback
        self.memory_cache = {}
        self.cache_times = {}
        self.cache_access_times = {}
        self.lock = threading.RLock()
        
        # Start cleanup thread
        self.cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
        self.cleanup_thread.start()
    
    def _cleanup_worker(self) -> None:
        """Background worker to clean up expired cache entries."""
        while True:
            time.sleep(self.config.cleanup_interval)
            self._cleanup()
    
    def _cleanup(self) -> None:
        """Clean up expired cache entries."""
        current_time = time.time()
        
        if self.redis_client:
            # Redis handles expiration automatically
            return
        
        with self.lock:
            # Remove expired entries
            expired_keys = [
                key for key, expiry_time in self.cache_times.items()
                if expiry_time <= current_time
            ]
            
            for key in expired_keys:
                self.memory_cache.pop(key, None)
                self.cache_times.pop(key, None)
                self.cache_access_times.pop(key, None)
            
            # Remove oldest entries if cache is too large
            if len(self.memory_cache) > self.config.max_cache_size:
                # Sort by access time and remove oldest
                sorted_keys = sorted(
                    self.cache_access_times.keys(),
                    key=lambda k: self.cache_access_times[k]
                )
                
                keys_to_remove = sorted_keys[:len(self.memory_cache) - self.config.max_cache_size]
                for key in keys_to_remove:
                    self.memory_cache.pop(key, None)
                    self.cache_times.pop(key, None)
                    self.cache_access_times.pop(key, None)
        
        logger.debug(f"Cache cleanup completed. Cache size: {len(self.memory_cache)}")
    
    def _generate_key(self, prefix: str, *args, **kwargs) -> str:
        """Generate a cache key from arguments."""
        key_data = {
            'prefix': prefix,
            'args': args,
            'kwargs': sorted(kwargs.items())
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get(self, prefix: str, *args, **kwargs) -> Any:
        """
        Get value from cache.
        
        Args:
            prefix (str): Cache key prefix
            *args: Arguments for key generation
            **kwargs: Keyword arguments for key generation
            
        Returns:
            Any: Cached value or None
        """
        key = self._generate_key(prefix, *args, **kwargs)
        
        if self.redis_client:
            try:
                value = self.redis_client.get(key)
                if value:
                    return json.loads(value)
            except Exception as e:
                logger.error(f"Redis get error: {e}")
        
        with self.lock:
            if key in self.memory_cache:
                self.cache_access_times[key] = time.time()
                return self.memory_cache[key]
        
        return None
    
    def set(self, prefix: str, value: Any, ttl: Optional[int] = None, *args, **kwargs) -> bool:
        """
        Set value in cache.
        
        Args:
            prefix (str): Cache key prefix
            value (Any): Value to cache
            ttl (Optional[int]): Time to live in seconds
            *args: Arguments for key generation
            **kwargs: Keyword arguments for key generation
            
        Returns:
            bool: True if value was cached successfully
        """
        key = self._generate_key(prefix, *args, **kwargs)
        ttl = ttl or self.config.default_ttl
        
        if self.redis_client:
            try:
                self.redis_client.setex(key, ttl, json.dumps(value))
                return True
            except Exception as e:
                logger.error(f"Redis set error: {e}")
        
        with self.lock:
            self.memory_cache[key] = value
            self.cache_times[key] = time.time() + ttl
            self.cache_access_times[key] = time.time()
        
        return True
    
    def delete(self, prefix: str, *args, **kwargs) -> bool:
        """
        Delete value from cache.
        
        Args:
            prefix (str): Cache key prefix
            *args: Arguments for key generation
            **kwargs: Keyword arguments for key generation
            
        Returns:
            bool: True if value was deleted successfully
        """
        key = self._generate_key(prefix, *args, **kwargs)
        
        if self.redis_client:
            try:
                self.redis_client.delete(key)
            except Exception as e:
                logger.error(f"Redis delete error: {e}")
        
        with self.lock:
            deleted = key in self.memory_cache
            self.memory_cache.pop(key, None)
            self.cache_times.pop(key, None)
            self.cache_access_times.pop(key, None)
        
        return deleted
    
    def clear(self, pattern: Optional[str] = None) -> int:
        """
        Clear cache entries.
        
        Args:
            pattern (Optional[str]): Pattern to match (Redis only)
            
        Returns:
            int: Number of entries cleared
        """
        if self.redis_client and pattern:
            try:
                keys = self.redis_client.keys(pattern)
                if keys:
                    return self.redis_client.delete(*keys)
            except Exception as e:
                logger.error(f"Redis clear error: {e}")
        
        with self.lock:
            count = len(self.memory_cache)
            self.memory_cache.clear()
            self.cache_times.clear()
            self.cache_access_times.clear()
        
        return count
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """
        Get cache statistics.
        
        Returns:
            Dict[str, Any]: Cache statistics
        """
        stats = {
            'backend': 'redis' if self.redis_client else 'memory',
            'memory_cache_size': len(self.memory_cache),
            'max_cache_size': self.config.max_cache_size,
            'default_ttl': self.config.default_ttl
        }
        
        if self.redis_client:
            try:
                info = self.redis_client.info()
                stats.update({
                    'redis_memory_used': info.get('used_memory', 0),
                    'redis_connected_clients': info.get('connected_clients', 0),
                    'redis_total_commands': info.get('total_commands_processed', 0)
                })
            except Exception as e:
                logger.error(f"Redis stats error: {e}")
        
        return stats

class APIRateLimiter:
    """
    Combined API rate limiting and caching system.
    Provides decorators and utilities for API endpoints.
    """
    
    def __init__(self, 
                 rate_limit_config: RateLimitConfig = None,
                 cache_config: CacheConfig = None,
                 redis_url: str = None):
        """
        Initialize the APIRateLimiter.
        
        Args:
            rate_limit_config (RateLimitConfig): Rate limiting configuration
            cache_config (CacheConfig): Cache configuration
            redis_url (str): Redis connection URL
        """
        self.rate_limiter = RateLimiter(rate_limit_config)
        self.cache_manager = CacheManager(cache_config, redis_url)
        self.usage_stats = defaultdict(int)
    
    def rate_limit(self, identifier_func=None, window_type: str = 'minute'):
        """
        Decorator for rate limiting API endpoints.
        
        Args:
            identifier_func: Function to extract identifier from request
            window_type (str): Type of time window
        """
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Get identifier (use first argument as default)
                if identifier_func:
                    identifier = identifier_func(*args, **kwargs)
                else:
                    identifier = str(args[0]) if args else 'default'
                
                # Check rate limit
                allowed, info = self.rate_limiter.is_allowed(identifier, window_type)
                
                if not allowed:
                    logger.warning(f"Rate limit exceeded for {identifier}: {info}")
                    return {
                        'error': 'Rate limit exceeded',
                        'retry_after': info['retry_after'],
                        'limit': info['limit'],
                        'window_type': info['window_type']
                    }, 429
                
                # Update usage stats
                self.usage_stats[identifier] += 1
                
                # Call function
                return func(*args, **kwargs)
            
            return wrapper
        return decorator
    
    def cache_response(self, ttl: Optional[int] = None, key_prefix: str = 'api'):
        """
        Decorator for caching API responses.
        
        Args:
            ttl (Optional[int]): Cache TTL in seconds
            key_prefix (str): Cache key prefix
        """
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = f"{key_prefix}:{func.__name__}"
                
                # Try to get from cache
                cached_result = self.cache_manager.get(cache_key, *args, **kwargs)
                if cached_result is not None:
                    logger.debug(f"Cache hit for {cache_key}")
                    return cached_result
                
                # Call function
                result = func(*args, **kwargs)
                
                # Cache result
                self.cache_manager.set(cache_key, result, ttl, *args, **kwargs)
                logger.debug(f"Cached result for {cache_key}")
                
                return result
            
            return wrapper
        return decorator
    
    def get_weather_cache_key(self, city: str, endpoint: str = 'current') -> str:
        """
        Generate cache key for weather data.
        
        Args:
            city (str): City name
            endpoint (str): Weather endpoint type
            
        Returns:
            str: Cache key
        """
        return f"weather:{endpoint}:{city.lower()}"
    
    def cache_weather_data(self, city: str, data: Dict[str, Any], endpoint: str = 'current') -> None:
        """
        Cache weather data with appropriate TTL.
        
        Args:
            city (str): City name
            data (Dict[str, Any]): Weather data
            endpoint (str): Weather endpoint type
        """
        ttl = self.cache_manager.config.weather_ttl if endpoint == 'current' else self.cache_manager.config.forecast_ttl
        # Use prefix and kwargs to generate proper cache key that matches get method
        self.cache_manager.set('weather', data, ttl, city.lower(), endpoint=endpoint)
        logger.debug(f"Cached weather data for {city} ({endpoint})")
    
    def get_cached_weather_data(self, city: str, endpoint: str = 'current') -> Optional[Dict[str, Any]]:
        """
        Get cached weather data.
        
        Args:
            city (str): City name
            endpoint (str): Weather endpoint type
            
        Returns:
            Optional[Dict[str, Any]]: Cached weather data
        """
        # Use same prefix and kwargs as set method to generate matching cache key
        return self.cache_manager.get('weather', city.lower(), endpoint=endpoint)
    
    def get_api_usage_stats(self) -> Dict[str, Any]:
        """
        Get API usage statistics.
        
        Returns:
            Dict[str, Any]: Usage statistics
        """
        return {
            'total_requests': sum(self.usage_stats.values()),
            'unique_clients': len(self.usage_stats),
            'requests_per_client': dict(self.usage_stats),
            'rate_limit_stats': {
                'active_identifiers': len(self.rate_limiter.requests),
                'config': {
                    'requests_per_minute': self.rate_limiter.config.requests_per_minute,
                    'requests_per_hour': self.rate_limiter.config.requests_per_hour,
                    'requests_per_day': self.rate_limiter.config.requests_per_day
                }
            },
            'cache_stats': self.cache_manager.get_cache_stats()
        }
    
    def reset_client_limits(self, identifier: str) -> None:
        """
        Reset rate limits for a specific client.
        
        Args:
            identifier (str): Client identifier
        """
        self.rate_limiter.reset_limits(identifier)
        self.usage_stats.pop(identifier, None)
        logger.info(f"Reset limits and stats for client: {identifier}")
    
    def clear_weather_cache(self, city: Optional[str] = None) -> int:
        """
        Clear weather cache.
        
        Args:
            city (Optional[str]): Specific city to clear, or None for all
            
        Returns:
            int: Number of entries cleared
        """
        if city:
            pattern = f"weather:*:{city.lower()}"
        else:
            pattern = "weather:*"
        
        return self.cache_manager.clear(pattern)

# Global instances for easy access
rate_limiter = RateLimiter()
cache_manager = CacheManager()
api_rate_limiter = APIRateLimiter()

# Convenience functions
def is_rate_limited(identifier: str, window_type: str = 'minute') -> tuple[bool, Dict[str, Any]]:
    """Check if identifier is rate limited."""
    return rate_limiter.is_allowed(identifier, window_type)

def cache_get(prefix: str, *args, **kwargs) -> Any:
    """Get value from cache."""
    return cache_manager.get(prefix, *args, **kwargs)

def cache_set(prefix: str, value: Any, ttl: Optional[int] = None, *args, **kwargs) -> bool:
    """Set value in cache."""
    return cache_manager.set(prefix, value, ttl, *args, **kwargs)

def cache_delete(prefix: str, *args, **kwargs) -> bool:
    """Delete value from cache."""
    return cache_manager.delete(prefix, *args, **kwargs)

def get_cached_weather(city: str, endpoint: str = 'current') -> Optional[Dict[str, Any]]:
    """Get cached weather data."""
    return api_rate_limiter.get_cached_weather_data(city, endpoint)

def cache_weather(city: str, data: Dict[str, Any], endpoint: str = 'current') -> None:
    """Cache weather data."""
    api_rate_limiter.cache_weather_data(city, data, endpoint)
680 lines•24.3 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer