help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • Blog
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
weather-chatbot
/
utils
RSK World
weather-chatbot
Weather Chatbot - Python + Flask + OpenWeatherMap + OpenAI + Weather Forecast + Weather Alerts + Natural Language Processing
utils
  • __pycache__
  • __init__.py3.9 KB
  • advanced_nlp.py28.4 KB
  • analytics.py22 KB
  • auth.py17.8 KB
  • comparison.py24.2 KB
  • database.py24.6 KB
  • geolocation.py20.1 KB
  • multilang.py22 KB
  • notifications.py34.1 KB
  • rate_limiting.py24.3 KB
  • weather_maps.py29.6 KB
  • weather_utils.py12.9 KB
multilang.py__init__.pyauth.pyweather_api.cpython-313.pycgeolocation.py
utils/multilang.py
Raw Download
Find: Go to:
"""
Multi-language Support Module for Weather Chatbot
===============================================

This module provides comprehensive multi-language support for the Weather Chatbot,
including translation management, language detection, and localized responses.

Features:
- Support for multiple languages
- Automatic language detection
- Translation management
- Localized weather descriptions
- Language-specific response formatting

Author: RSK World
Website: https://rskworld.in
Email: hello@rskworld.in
Phone: +91 93305 39277
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026
"""

import json
import os
from typing import Dict, Optional, List, Any
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MultiLanguageManager:
    """
    Manages multi-language support for the Weather Chatbot.
    Handles translations, language detection, and localized responses.
    """
    
    def __init__(self, translations_dir: str = "translations"):
        """
        Initialize the MultiLanguageManager.
        
        Args:
            translations_dir (str): Directory containing translation files
        """
        self.translations_dir = translations_dir
        self.translations = {}
        self.supported_languages = {
            'en': 'English',
            'es': 'Español',
            'fr': 'Français',
            'de': 'Deutsch',
            'it': 'Italiano',
            'pt': 'Português',
            'ru': 'Русский',
            'zh': '中文',
            'ja': '日本語',
            'ar': 'العربية',
            'hi': 'हिन्दी',
            'bn': 'বাংলা'
        }
        self.default_language = 'en'
        self.current_language = 'en'
        
        # Load translations
        self._load_translations()
    
    def _load_translations(self) -> None:
        """Load all translation files from the translations directory."""
        try:
            # Create translations directory if it doesn't exist
            if not os.path.exists(self.translations_dir):
                os.makedirs(self.translations_dir)
                self._create_default_translations()
            
            # Load translation files
            for lang_code in self.supported_languages.keys():
                translation_file = os.path.join(self.translations_dir, f"{lang_code}.json")
                if os.path.exists(translation_file):
                    with open(translation_file, 'r', encoding='utf-8') as f:
                        self.translations[lang_code] = json.load(f)
                else:
                    # Create default translation file
                    self.translations[lang_code] = self._get_default_translations()
                    self._save_translation(lang_code, self.translations[lang_code])
            
            logger.info(f"Loaded translations for {len(self.translations)} languages")
            
        except Exception as e:
            logger.error(f"Error loading translations: {str(e)}")
            self.translations = {'en': self._get_default_translations()}
    
    def _create_default_translations(self) -> None:
        """Create default translation files for all supported languages."""
        default_translations = self._get_default_translations()
        
        for lang_code in self.supported_languages.keys():
            translation_file = os.path.join(self.translations_dir, f"{lang_code}.json")
            if not os.path.exists(translation_file):
                with open(translation_file, 'w', encoding='utf-8') as f:
                    json.dump(default_translations, f, indent=2, ensure_ascii=False)
    
    def _get_default_translations(self) -> Dict[str, str]:
        """Get default translations (English)."""
        return {
            # Common phrases
            "welcome": "Welcome to Weather Chatbot!",
            "goodbye": "Goodbye! Have a great day!",
            "thank_you": "Thank you for using Weather Chatbot!",
            "error": "An error occurred. Please try again.",
            "loading": "Loading weather data...",
            "no_data": "No weather data available.",
            
            # Weather descriptions
            "sunny": "Sunny",
            "cloudy": "Cloudy",
            "rainy": "Rainy",
            "snowy": "Snowy",
            "stormy": "Stormy",
            "foggy": "Foggy",
            "windy": "Windy",
            "partly_cloudy": "Partly Cloudy",
            "mostly_cloudy": "Mostly Cloudy",
            "clear": "Clear",
            
            # Temperature
            "temperature": "Temperature",
            "feels_like": "Feels like",
            "min_temp": "Minimum Temperature",
            "max_temp": "Maximum Temperature",
            "humidity": "Humidity",
            "pressure": "Pressure",
            "visibility": "Visibility",
            "uv_index": "UV Index",
            
            # Wind
            "wind_speed": "Wind Speed",
            "wind_direction": "Wind Direction",
            "wind_gust": "Wind Gust",
            
            # Weather conditions
            "conditions": "Conditions",
            "description": "Description",
            "precipitation": "Precipitation",
            "chance_of_rain": "Chance of Rain",
            "chance_of_snow": "Chance of Snow",
            
            # Time
            "today": "Today",
            "tomorrow": "Tomorrow",
            "yesterday": "Yesterday",
            "morning": "Morning",
            "afternoon": "Afternoon",
            "evening": "Evening",
            "night": "Night",
            "now": "Now",
            
            # Location
            "location": "Location",
            "city": "City",
            "country": "Country",
            "coordinates": "Coordinates",
            "timezone": "Timezone",
            
            # Messages
            "weather_in": "Weather in {city}",
            "current_weather": "Current Weather",
            "forecast": "Forecast",
            "hourly_forecast": "Hourly Forecast",
            "daily_forecast": "Daily Forecast",
            "weekly_forecast": "Weekly Forecast",
            
            # Questions
            "how_is_weather": "How is the weather in {city}?",
            "what_is_temperature": "What is the temperature in {city}?",
            "will_it_rain": "Will it rain in {city}?",
            "weather_forecast": "Weather forecast for {city}",
            
            # Responses
            "temp_is": "The temperature in {city} is {temp}°{unit}",
            "weather_is": "The weather in {city} is {condition}",
            "rain_expected": "Rain is expected in {city}",
            "no_rain_expected": "No rain expected in {city}",
            
            # Error messages
            "city_not_found": "City not found. Please check the spelling.",
            "api_error": "Weather API error. Please try again later.",
            "network_error": "Network error. Please check your connection.",
            "invalid_location": "Invalid location provided.",
            
            # UI elements
            "search": "Search",
            "search_city": "Search for a city...",
            "get_weather": "Get Weather",
            "refresh": "Refresh",
            "settings": "Settings",
            "language": "Language",
            "units": "Units",
            "celsius": "Celsius",
            "fahrenheit": "Fahrenheit",
            "kelvin": "Kelvin",
            
            # Notifications
            "weather_alert": "Weather Alert",
            "severe_weather": "Severe Weather Warning",
            "temperature_alert": "Temperature Alert",
            "precipitation_alert": "Precipitation Alert",
            
            # Analytics
            "weather_trends": "Weather Trends",
            "temperature_trend": "Temperature Trend",
            "precipitation_trend": "Precipitation Trend",
            "weather_comparison": "Weather Comparison",
            "city_comparison": "City Comparison",
            
            # Authentication
            "login": "Login",
            "logout": "Logout",
            "register": "Register",
            "username": "Username",
            "password": "Password",
            "email": "Email",
            "profile": "Profile",
            
            # General
            "ok": "OK",
            "cancel": "Cancel",
            "yes": "Yes",
            "no": "No",
            "save": "Save",
            "delete": "Delete",
            "edit": "Edit",
            "close": "Close",
            "back": "Back",
            "next": "Next",
            "previous": "Previous",
            "home": "Home",
            "about": "About",
            "help": "Help",
            "contact": "Contact"
        }
    
    def _save_translation(self, lang_code: str, translations: Dict[str, str]) -> None:
        """
        Save translations to file.
        
        Args:
            lang_code (str): Language code
            translations (Dict[str, str]): Translation dictionary
        """
        try:
            translation_file = os.path.join(self.translations_dir, f"{lang_code}.json")
            with open(translation_file, 'w', encoding='utf-8') as f:
                json.dump(translations, f, indent=2, ensure_ascii=False)
        except Exception as e:
            logger.error(f"Error saving translation for {lang_code}: {str(e)}")
    
    def set_language(self, lang_code: str) -> bool:
        """
        Set the current language.
        
        Args:
            lang_code (str): Language code
            
        Returns:
            bool: True if language was set successfully
        """
        if lang_code in self.supported_languages:
            self.current_language = lang_code
            logger.info(f"Language set to: {self.supported_languages[lang_code]}")
            return True
        return False
    
    def get_language(self) -> str:
        """
        Get the current language code.
        
        Returns:
            str: Current language code
        """
        return self.current_language
    
    def get_supported_languages(self) -> Dict[str, str]:
        """
        Get supported languages.
        
        Returns:
            Dict[str, str]: Dictionary of language codes and names
        """
        return self.supported_languages.copy()
    
    def translate(self, key: str, lang_code: Optional[str] = None, **kwargs) -> str:
        """
        Translate a key to the specified language.
        
        Args:
            key (str): Translation key
            lang_code (Optional[str]): Language code (uses current if None)
            **kwargs: Variables for string formatting
            
        Returns:
            str: Translated text
        """
        if lang_code is None:
            lang_code = self.current_language
        
        # Fallback to default language if translation not found
        if lang_code not in self.translations:
            lang_code = self.default_language
        
        # Get translation
        translation = self.translations.get(lang_code, {}).get(key, key)
        
        # Format with variables if provided
        try:
            if kwargs:
                translation = translation.format(**kwargs)
        except (KeyError, ValueError) as e:
            logger.warning(f"Error formatting translation '{key}': {str(e)}")
        
        return translation
    
    def detect_language(self, text: str) -> str:
        """
        Detect the language of the given text.
        
        Args:
            text (str): Text to analyze
            
        Returns:
            str: Detected language code
        """
        # Simple language detection based on common words/phrases
        # In a real implementation, you might use a library like langdetect
        
        text_lower = text.lower()
        
        # Language indicators
        language_indicators = {
            'es': ['hola', 'buenos', 'días', 'gracias', 'por favor', 'clima', 'tiempo'],
            'fr': ['bonjour', 'merci', 's\'il', 'vous', 'plaît', 'météo', 'temps'],
            'de': ['hallo', 'danke', 'bitte', 'wetter', 'klima'],
            'it': ['ciao', 'grazie', 'per', 'favore', 'tempo', 'meteo'],
            'pt': ['olá', 'obrigado', 'por', 'favor', 'tempo', 'clima'],
            'ru': ['привет', 'спасибо', 'пожалуйста', 'погода'],
            'zh': ['你好', '谢谢', '天气', '气候'],
            'ja': ['こんにちは', 'ありがとう', '天気', '気候'],
            'ar': ['مرحبا', 'شكرا', 'الطقس', 'المناخ'],
            'hi': ['नमस्ते', 'धन्यवाद', 'मौसम', 'जलवायु'],
            'bn': ['হ্যালো', 'ধন্যবাদ', 'আবহাওয়া', 'জলবায়ু']
        }
        
        # Check for language indicators
        for lang_code, indicators in language_indicators.items():
            for indicator in indicators:
                if indicator in text_lower:
                    return lang_code
        
        # Default to English if no indicators found
        return 'en'
    
    def get_localized_weather_description(self, weather_code: str, lang_code: Optional[str] = None) -> str:
        """
        Get localized weather description based on weather code.
        
        Args:
            weather_code (str): Weather condition code
            lang_code (Optional[str]): Language code
            
        Returns:
            str: Localized weather description
        """
        if lang_code is None:
            lang_code = self.current_language
        
        # Weather code to description mapping
        weather_descriptions = {
            '01d': 'clear',
            '01n': 'clear',
            '02d': 'partly_cloudy',
            '02n': 'partly_cloudy',
            '03d': 'cloudy',
            '03n': 'cloudy',
            '04d': 'mostly_cloudy',
            '04n': 'mostly_cloudy',
            '09d': 'rainy',
            '09n': 'rainy',
            '10d': 'rainy',
            '10n': 'rainy',
            '11d': 'stormy',
            '11n': 'stormy',
            '13d': 'snowy',
            '13n': 'snowy',
            '50d': 'foggy',
            '50n': 'foggy'
        }
        
        description_key = weather_descriptions.get(weather_code, 'clear')
        return self.translate(description_key, lang_code)
    
    def format_temperature(self, temp: float, unit: str = 'C', lang_code: Optional[str] = None) -> str:
        """
        Format temperature according to language conventions.
        
        Args:
            temp (float): Temperature value
            unit (str): Temperature unit (C, F, K)
            lang_code (Optional[str]): Language code
            
        Returns:
            str: Formatted temperature string
        """
        if lang_code is None:
            lang_code = self.current_language
        
        # Different languages have different formatting conventions
        if lang_code in ['de', 'fr', 'es', 'it', 'pt']:
            # Use comma as decimal separator
            temp_str = f"{temp:.1f}".replace('.', ',')
        else:
            # Use dot as decimal separator
            temp_str = f"{temp:.1f}"
        
        return f"{temp_str}°{unit}"
    
    def format_datetime(self, dt: datetime, lang_code: Optional[str] = None) -> str:
        """
        Format datetime according to language conventions.
        
        Args:
            dt (datetime): Datetime object
            lang_code (Optional[str]): Language code
            
        Returns:
            str: Formatted datetime string
        """
        if lang_code is None:
            lang_code = self.current_language
        
        # Different date formats for different languages
        formats = {
            'en': '%Y-%m-%d %H:%M',
            'es': '%d/%m/%Y %H:%M',
            'fr': '%d/%m/%Y %H:%M',
            'de': '%d.%m.%Y %H:%M',
            'it': '%d/%m/%Y %H:%M',
            'pt': '%d/%m/%Y %H:%M',
            'ru': '%d.%m.%Y %H:%M',
            'zh': '%Y年%m月%d日 %H:%M',
            'ja': '%Y年%m月%d日 %H:%M',
            'ar': '%d/%m/%Y %H:%M',
            'hi': '%d/%m/%Y %H:%M',
            'bn': '%d/%m/%Y %H:%M'
        }
        
        date_format = formats.get(lang_code, formats['en'])
        return dt.strftime(date_format)
    
    def add_translation(self, lang_code: str, key: str, value: str) -> bool:
        """
        Add or update a translation.
        
        Args:
            lang_code (str): Language code
            key (str): Translation key
            value (str): Translation value
            
        Returns:
            bool: True if translation was added successfully
        """
        try:
            if lang_code not in self.translations:
                self.translations[lang_code] = {}
            
            self.translations[lang_code][key] = value
            self._save_translation(lang_code, self.translations[lang_code])
            logger.info(f"Added translation for {lang_code}: {key} = {value}")
            return True
        except Exception as e:
            logger.error(f"Error adding translation: {str(e)}")
            return False
    
    def remove_translation(self, lang_code: str, key: str) -> bool:
        """
        Remove a translation.
        
        Args:
            lang_code (str): Language code
            key (str): Translation key
            
        Returns:
            bool: True if translation was removed successfully
        """
        try:
            if lang_code in self.translations and key in self.translations[lang_code]:
                del self.translations[lang_code][key]
                self._save_translation(lang_code, self.translations[lang_code])
                logger.info(f"Removed translation for {lang_code}: {key}")
                return True
            return False
        except Exception as e:
            logger.error(f"Error removing translation: {str(e)}")
            return False
    
    def get_translation_stats(self) -> Dict[str, Any]:
        """
        Get statistics about translations.
        
        Returns:
            Dict[str, Any]: Translation statistics
        """
        stats = {
            'total_languages': len(self.translations),
            'supported_languages': len(self.supported_languages),
            'current_language': self.current_language,
            'default_language': self.default_language,
            'translations_per_language': {}
        }
        
        for lang_code, translations in self.translations.items():
            stats['translations_per_language'][lang_code] = len(translations)
        
        return stats
    
    def export_translations(self, lang_code: str, file_path: str) -> bool:
        """
        Export translations to a file.
        
        Args:
            lang_code (str): Language code
            file_path (str): Output file path
            
        Returns:
            bool: True if export was successful
        """
        try:
            if lang_code in self.translations:
                with open(file_path, 'w', encoding='utf-8') as f:
                    json.dump(self.translations[lang_code], f, indent=2, ensure_ascii=False)
                logger.info(f"Exported translations for {lang_code} to {file_path}")
                return True
            return False
        except Exception as e:
            logger.error(f"Error exporting translations: {str(e)}")
            return False
    
    def import_translations(self, lang_code: str, file_path: str) -> bool:
        """
        Import translations from a file.
        
        Args:
            lang_code (str): Language code
            file_path (str): Input file path
            
        Returns:
            bool: True if import was successful
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                translations = json.load(f)
            
            self.translations[lang_code] = translations
            self._save_translation(lang_code, translations)
            logger.info(f"Imported translations for {lang_code} from {file_path}")
            return True
        except Exception as e:
            logger.error(f"Error importing translations: {str(e)}")
            return False


# Global instance for easy access
multilang_manager = MultiLanguageManager()

# Convenience functions
def translate(key: str, lang_code: Optional[str] = None, **kwargs) -> str:
    """Convenience function for translation."""
    return multilang_manager.translate(key, lang_code, **kwargs)

def set_language(lang_code: str) -> bool:
    """Convenience function for setting language."""
    return multilang_manager.set_language(lang_code)

def get_language() -> str:
    """Convenience function for getting current language."""
    return multilang_manager.get_language()

def detect_language(text: str) -> str:
    """Convenience function for language detection."""
    return multilang_manager.detect_language(text)

def get_localized_weather_description(weather_code: str, lang_code: Optional[str] = None) -> str:
    """Convenience function for localized weather descriptions."""
    return multilang_manager.get_localized_weather_description(weather_code, lang_code)

def format_temperature(temp: float, unit: str = 'C', lang_code: Optional[str] = None) -> str:
    """Convenience function for temperature formatting."""
    return multilang_manager.format_temperature(temp, unit, lang_code)

def format_datetime(dt: datetime, lang_code: Optional[str] = None) -> str:
    """Convenience function for datetime formatting."""
    return multilang_manager.format_datetime(dt, lang_code)
602 lines•22 KB
python
utils/__init__.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Weather Chatbot Utils Package for Weather Chatbot
==============================================

This package contains utility modules for the Weather Chatbot application.

Modules:
- weather_utils: Basic weather data processing and formatting
- analytics: Advanced weather analytics and insights
- database: Database operations and management
- auth: User authentication and session management
- geolocation: Location detection and geolocation services
- comparison: Weather comparison between cities
- notifications: Weather notifications and alerts
- advanced_nlp: Advanced natural language processing
- weather_maps: Weather maps and radar integration
- multilang: Multi-language support
- rate_limiting: API rate limiting and caching

Author: RSK World
Website: https://rskworld.in
Email: hello@rskworld.in
Phone: +91 93305 39277
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026
"""

# Import all utility modules
from .weather_utils import *
from .analytics import *
from .database import *
from .auth import *
from .geolocation import *
from .comparison import *
from .notifications import *
from .advanced_nlp import *
from .weather_maps import *
from .multilang import *
from .rate_limiting import *

# Define what gets imported with "from utils import *"
__all__ = [
    # Weather utilities
    'format_temperature', 'format_wind_speed', 'get_weather_emoji',
    'get_weather_description', 'calculate_heat_index', 'calculate_wind_chill',
    'get_uv_index_level', 'get_air_quality_level', 'format_pressure',
    'format_visibility', 'format_humidity', 'get_comfort_level',
    
    # Analytics
    'WeatherAnalytics', 'calculate_weather_trends', 'generate_weather_insights',
    'calculate_comfort_index', 'compare_cities', 'get_weather_summary',
    
    # Database
    'WeatherDatabase', 'DatabaseManager', 'UserManager', 'WeatherHistory', 'QueryLogger',
    'FavoriteCities', 'APIUsageTracker', 'WeatherAlerts',
    
    # Authentication
    'AuthManager', 'generate_jwt_token', 'verify_jwt_token', 'hash_password',
    'verify_password', 'generate_session_id', 'validate_email',
    
    # Geolocation
    'GeolocationService', 'get_location_by_ip', 'get_location_by_coordinates',
    'reverse_geocode', 'calculate_distance', 'get_timezone',
    
    # Comparison
    'WeatherComparison', 'compare_current_weather', 'compare_forecasts',
    'rank_cities_by_temperature', 'get_weather_comparison_summary',
    
    # Notifications
    'NotificationManager', 'EmailNotifier', 'SMSNotifier', 'WebhookNotifier',
    'PushNotifier', 'InAppNotifier', 'WeatherAlertManager',
    
    # Advanced NLP
    'AdvancedNLP', 'QueryAnalyzer', 'IntentDetector', 'EntityExtractor',
    'SentimentAnalyzer', 'ResponseGenerator',
    
    # Weather Maps
    'WeatherMapService', 'generate_weather_map', 'get_radar_data',
    'create_custom_map', 'get_map_layers',
    
    # Multi-language
    'MultiLanguageManager', 'translate', 'set_language', 'get_language',
    'detect_language', 'get_localized_weather_description',
    'format_temperature', 'format_datetime',
    
    # Rate Limiting
    'RateLimiter', 'CacheManager', 'APIRateLimiter', 'is_rate_limited',
    'cache_get', 'cache_set', 'cache_delete', 'get_cached_weather', 'cache_weather',
    'format_wind_speed',
    'format_pressure',
    'format_visibility',
    'get_weather_icon_url',
    'get_weather_description',
    'calculate_heat_index',
    'calculate_wind_chill',
    'get_uv_index_level',
    'get_air_quality_description',
    'format_timestamp',
    'get_daily_forecast_summary',
    'validate_city_name',
    'extract_city_from_query',
    'get_weather_emoji',
    'create_weather_summary'
]

__version__ = '1.0.0'
__author__ = 'RSK World'
__email__ = 'hello@rskworld.in'
__website__ = 'https://rskworld.in'
111 lines•3.9 KB
python
utils/auth.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Weather Chatbot Authentication Module
====================================

Author: RSK World (https://rskworld.in)
Founded by: Molla Samser
Designer & Tester: Rima Khatun
Contact: +91 93305 39277, hello@rskworld.in, support@rskworld.in
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026

Description: User authentication and session management
"""

import jwt
import secrets
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify, session, g
from werkzeug.security import generate_password_hash, check_password_hash
import re
from typing import Dict, Optional, Callable

class AuthManager:
    """Authentication and authorization manager"""
    
    def __init__(self, app=None, secret_key: str = None, db_manager=None):
        self.app = app
        self.secret_key = secret_key or secrets.token_urlsafe(32)
        self.db_manager = db_manager
        self.token_expiry = timedelta(hours=24)
        
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """Initialize with Flask app"""
        app.config.setdefault('JWT_SECRET_KEY', self.secret_key)
        app.config.setdefault('JWT_ACCESS_TOKEN_EXPIRES', self.token_expiry)
        
        @app.before_request
        def load_user():
            """Load user from session/token before each request"""
            user_id = session.get('user_id')
            if user_id:
                g.current_user = self.get_user_by_id(user_id)
            else:
                g.current_user = None
    
    def generate_token(self, user_data: Dict) -> str:
        """Generate JWT access token"""
        payload = {
            'user_id': user_data['user_id'],
            'username': user_data['username'],
            'email': user_data['email'],
            'exp': datetime.utcnow() + self.token_expiry,
            'iat': datetime.utcnow()
        }
        
        return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
    def verify_token(self, token: str) -> Optional[Dict]:
        """Verify JWT token and return payload"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def authenticate_user(self, username: str, password: str) -> Dict:
        """Authenticate user credentials"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate input
        if not username or not password:
            return {'success': False, 'error': 'Username and password required'}
        
        # Authenticate against database
        auth_result = self.db_manager.authenticate_user(username, password)
        
        if auth_result['success']:
            # Generate token
            token = self.generate_token(auth_result)
            auth_result['token'] = token
            
            # Create session
            session['user_id'] = auth_result['user_id']
            session['username'] = auth_result['username']
            session.permanent = True
        
        return auth_result
    
    def register_user(self, username: str, email: str, password: str) -> Dict:
        """Register new user account"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate inputs
        validation_errors = self.validate_registration_data(username, email, password)
        if validation_errors:
            return {'success': False, 'error': validation_errors}
        
        # Create user in database
        create_result = self.db_manager.create_user(username, email, password)
        
        return create_result
    
    def validate_registration_data(self, username: str, email: str, password: str) -> str:
        """Validate user registration data"""
        errors = []
        
        # Username validation
        if not username:
            errors.append('Username is required')
        elif len(username) < 3:
            errors.append('Username must be at least 3 characters long')
        elif len(username) > 30:
            errors.append('Username must be less than 30 characters')
        elif not re.match(r'^[a-zA-Z0-9_]+$', username):
            errors.append('Username can only contain letters, numbers, and underscores')
        
        # Email validation
        if not email:
            errors.append('Email is required')
        elif not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
            errors.append('Invalid email format')
        
        # Password validation
        if not password:
            errors.append('Password is required')
        elif len(password) < 8:
            errors.append('Password must be at least 8 characters long')
        elif len(password) > 128:
            errors.append('Password must be less than 128 characters')
        elif not re.search(r'[A-Z]', password):
            errors.append('Password must contain at least one uppercase letter')
        elif not re.search(r'[a-z]', password):
            errors.append('Password must contain at least one lowercase letter')
        elif not re.search(r'\d', password):
            errors.append('Password must contain at least one digit')
        elif not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append('Password must contain at least one special character')
        
        return '; '.join(errors) if errors else ''
    
    def logout_user(self) -> Dict:
        """Logout user and clear session"""
        session.clear()
        g.current_user = None
        return {'success': True, 'message': 'Logged out successfully'}
    
    def get_current_user(self) -> Optional[Dict]:
        """Get currently authenticated user"""
        return getattr(g, 'current_user', None)
    
    def get_user_by_id(self, user_id: int) -> Optional[Dict]:
        """Get user by ID from database"""
        if not self.db_manager:
            return None
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT id, username, email, api_key, created_at, last_login, is_active
                    FROM users WHERE id = ?
                ''', (user_id,))
                
                user_data = cursor.fetchone()
                if user_data:
                    columns = [desc[0] for desc in cursor.description]
                    return dict(zip(columns, user_data))
                return None
        except:
            return None
    
    def change_password(self, user_id: int, old_password: str, new_password: str) -> Dict:
        """Change user password"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate new password
        validation_error = self.validate_password(new_password)
        if validation_error:
            return {'success': False, 'error': validation_error}
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Get current password hash
                cursor.execute('SELECT password_hash FROM users WHERE id = ?', (user_id,))
                result = cursor.fetchone()
                
                if not result:
                    return {'success': False, 'error': 'User not found'}
                
                current_hash = result[0]
                
                # Verify old password
                if not check_password_hash(current_hash, old_password):
                    return {'success': False, 'error': 'Current password is incorrect'}
                
                # Update password
                new_hash = generate_password_hash(new_password)
                cursor.execute('''
                    UPDATE users SET password_hash = ? WHERE id = ?
                ''', (new_hash, user_id))
                
                conn.commit()
                
                return {'success': True, 'message': 'Password changed successfully'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def validate_password(self, password: str) -> str:
        """Validate password strength"""
        if len(password) < 8:
            return 'Password must be at least 8 characters long'
        elif len(password) > 128:
            return 'Password must be less than 128 characters'
        elif not re.search(r'[A-Z]', password):
            return 'Password must contain at least one uppercase letter'
        elif not re.search(r'[a-z]', password):
            return 'Password must contain at least one lowercase letter'
        elif not re.search(r'\d', password):
            return 'Password must contain at least one digit'
        elif not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            return 'Password must contain at least one special character'
        
        return ''
    
    def reset_password_request(self, email: str) -> Dict:
        """Request password reset"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Generate reset token
        reset_token = secrets.token_urlsafe(32)
        expiry = datetime.utcnow() + timedelta(hours=1)
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Check if email exists
                cursor.execute('SELECT id, username FROM users WHERE email = ?', (email,))
                user = cursor.fetchone()
                
                if not user:
                    return {'success': False, 'error': 'Email not found'}
                
                user_id, username = user
                
                # Store reset token (you might want a separate table for this)
                cursor.execute('''
                    UPDATE users SET 
                    api_key = ? || '_reset_' || ?,
                    last_login = ?
                    WHERE id = ?
                ''', (reset_token, expiry.isoformat(), datetime.utcnow(), user_id))
                
                conn.commit()
                
                # In a real application, you would send an email here
                return {
                    'success': True,
                    'message': 'Password reset token generated',
                    'reset_token': reset_token,  # Only for development
                    'expires_at': expiry.isoformat()
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def reset_password(self, token: str, new_password: str) -> Dict:
        """Reset password with token"""
        if not self.db_manager:
            return {'success': False, 'error': 'Database not configured'}
        
        # Validate new password
        validation_error = self.validate_password(new_password)
        if validation_error:
            return {'success': False, 'error': validation_error}
        
        try:
            with self.db_manager.get_connection() as conn:
                cursor = conn.cursor()
                
                # Find user with reset token
                cursor.execute('''
                    SELECT id, api_key FROM users 
                    WHERE api_key LIKE ? || '_reset_%'
                ''', (token,))
                
                result = cursor.fetchone()
                
                if not result:
                    return {'success': False, 'error': 'Invalid or expired reset token'}
                
                user_id = result[0]
                
                # Update password
                new_hash = generate_password_hash(new_password)
                cursor.execute('''
                    UPDATE users SET password_hash = ?, api_key = ? WHERE id = ?
                ''', (new_hash, secrets.token_urlsafe(32), user_id))
                
                conn.commit()
                
                return {'success': True, 'message': 'Password reset successful'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}

# Decorators for authentication
def require_auth(f: Callable) -> Callable:
    """Decorator to require authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not hasattr(g, 'current_user') or not g.current_user:
            return jsonify({'error': 'Authentication required'}), 401
        return f(*args, **kwargs)
    return decorated_function

def require_api_key(f: Callable) -> Callable:
    """Decorator to require API key"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key')
        if not api_key:
            api_key = request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key against database
        # This would need access to the database manager
        # For now, just check if it's provided
        return f(*args, **kwargs)
    return decorated_function

def rate_limit(limit: int = 100, per: int = 3600):
    """Rate limiting decorator"""
    def decorator(f: Callable) -> Callable:
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Implement rate limiting logic here
            # This would typically use Redis or a database
            return f(*args, **kwargs)
        return decorated_function
    return decorator

class SessionManager:
    """Session management utilities"""
    
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """Initialize session configuration"""
        app.config.setdefault('SESSION_PERMANENT', True)
        app.config.setdefault('SESSION_USE_SIGNER', True)
        app.config.setdefault('SESSION_KEY_PREFIX', 'weather_chatbot:')
        app.config.setdefault('PERMANENT_SESSION_LIFETIME', timedelta(days=7))
    
    def create_session(self, user_data: Dict):
        """Create user session"""
        session.clear()
        session['user_id'] = user_data['user_id']
        session['username'] = user_data['username']
        session['email'] = user_data['email']
        session['login_time'] = datetime.utcnow().isoformat()
        session.permanent = True
    
    def update_session_activity(self):
        """Update last activity timestamp"""
        session['last_activity'] = datetime.utcnow().isoformat()
    
    def is_session_valid(self) -> bool:
        """Check if session is valid"""
        if not session.get('user_id'):
            return False
        
        # Check session age
        login_time = session.get('login_time')
        if login_time:
            try:
                login_dt = datetime.fromisoformat(login_time)
                if datetime.utcnow() - login_dt > timedelta(days=7):
                    return False
            except:
                return False
        
        return True
    
    def get_session_info(self) -> Dict:
        """Get session information"""
        return {
            'user_id': session.get('user_id'),
            'username': session.get('username'),
            'email': session.get('email'),
            'login_time': session.get('login_time'),
            'last_activity': session.get('last_activity'),
            'is_valid': self.is_session_valid()
        }
    
    def destroy_session(self):
        """Destroy current session"""
        session.clear()

# Security utilities
class SecurityUtils:
    """Security utility functions"""
    
    @staticmethod
    def generate_csrf_token() -> str:
        """Generate CSRF token"""
        return secrets.token_urlsafe(32)
    
    @staticmethod
    def validate_csrf_token(token: str, session_token: str) -> bool:
        """Validate CSRF token"""
        return secrets.compare_digest(token, session_token)
    
    @staticmethod
    def sanitize_input(input_string: str) -> str:
        """Sanitize user input"""
        if not input_string:
            return ''
        
        # Remove potentially dangerous characters
        dangerous_chars = ['<', '>', '"', "'", '&', '\x00', '\n', '\r', '\t']
        sanitized = input_string
        
        for char in dangerous_chars:
            sanitized = sanitized.replace(char, '')
        
        return sanitized.strip()
    
    @staticmethod
    def is_safe_url(url: str) -> bool:
        """Check if URL is safe for redirects"""
        if not url:
            return False
        
        # Allow relative URLs
        if url.startswith('/') and not url.startswith('//'):
            return True
        
        # Allow same origin URLs
        from flask import request
        host_url = request.host_url.rstrip('/')
        if url.startswith(host_url):
            return True
        
        return False
    
    @staticmethod
    def hash_data(data: str) -> str:
        """Hash data using SHA-256"""
        import hashlib
        return hashlib.sha256(data.encode()).hexdigest()
    
    @staticmethod
    def verify_data_hash(data: str, hash_value: str) -> bool:
        """Verify data against hash"""
        return SecurityUtils.hash_data(data) == hash_value
476 lines•17.8 KB
python
weather_api.cpython-313.pyc

This file cannot be displayed in the browser.

Download File
utils/geolocation.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Weather Chatbot Geolocation Module
===================================

Author: RSK World (https://rskworld.in)
Founded by: Molla Samser
Designer & Tester: Rima Khatun
Contact: +91 93305 39277, hello@rskworld.in, support@rskworld.in
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026

Description: Location detection and geolocation services
"""

import requests
import json
from typing import Dict, Optional, Tuple
from urllib.parse import quote

class GeolocationService:
    """Geolocation service for location-based weather services"""
    
    def __init__(self, ipinfo_api_key: str = None, openweather_api_key: str = None):
        self.ipinfo_api_key = ipinfo_api_key
        self.openweather_api_key = openweather_api_key
        self.ipinfo_url = "https://ipinfo.io"
        self.openweather_geo_url = "http://api.openweathermap.org/geo/1.0"
    
    def get_location_by_ip(self, ip_address: str = None) -> Dict:
        """
        Get location information by IP address.
        
        Args:
            ip_address: IP address (if None, uses current IP)
            
        Returns:
            Location information dictionary
        """
        try:
            url = f"{self.ipinfo_url}/{ip_address}" if ip_address else self.ipinfo_url
            params = {}
            
            if self.ipinfo_api_key:
                params['token'] = self.ipinfo_api_key
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            location_info = {
                'ip': data.get('ip'),
                'city': data.get('city'),
                'region': data.get('region'),
                'country': data.get('country'),
                'location': data.get('loc'),  # lat,lon
                'postal': data.get('postal'),
                'timezone': data.get('timezone'),
                'org': data.get('org'),  # ISP/organization
                'source': 'ipinfo'
            }
            
            # Parse coordinates
            if data.get('loc'):
                try:
                    lat, lon = data['loc'].split(',')
                    location_info['latitude'] = float(lat)
                    location_info['longitude'] = float(lon)
                except:
                    location_info['latitude'] = None
                    location_info['longitude'] = None
            
            return location_info
            
        except requests.exceptions.RequestException as e:
            return {'error': f'Failed to get location by IP: {str(e)}'}
        except Exception as e:
            return {'error': f'Error processing location data: {str(e)}'}
    
    def get_coordinates_by_city(self, city: str, country: str = None, state: str = None) -> Dict:
        """
        Get coordinates for a city using OpenWeatherMap Geocoding API.
        
        Args:
            city: City name
            country: Country code (optional)
            state: State code (optional, for US cities)
            
        Returns:
            Coordinates and location information
        """
        try:
            # Build query string
            query_parts = [city]
            if state:
                query_parts.append(state)
            if country:
                query_parts.append(country)
            
            query = ','.join(query_parts)
            
            url = f"{self.openweather_geo_url}/direct"
            params = {
                'q': query,
                'limit': 5,  # Get multiple results for disambiguation
                'appid': self.openweather_api_key
            }
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            if not data:
                return {'error': 'City not found'}
            
            # Return the first (most relevant) result
            result = data[0]
            
            return {
                'city': result.get('name'),
                'country': result.get('country'),
                'state': result.get('state'),
                'latitude': result.get('lat'),
                'longitude': result.get('lon'),
                'source': 'openweathermap',
                'alternatives': data[1:] if len(data) > 1 else []
            }
            
        except requests.exceptions.RequestException as e:
            return {'error': f'Failed to get coordinates: {str(e)}'}
        except Exception as e:
            return {'error': f'Error processing coordinates: {str(e)}'}
    
    def get_city_by_coordinates(self, lat: float, lon: float, limit: int = 1) -> Dict:
        """
        Get city information by coordinates using reverse geocoding.
        
        Args:
            lat: Latitude
            lon: Longitude
            limit: Number of results to return
            
        Returns:
            City information
        """
        try:
            url = f"{self.openweather_geo_url}/reverse"
            params = {
                'lat': lat,
                'lon': lon,
                'limit': limit,
                'appid': self.openweather_api_key
            }
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            if not data:
                return {'error': 'No location found for coordinates'}
            
            # Return the first result
            result = data[0]
            
            return {
                'city': result.get('name'),
                'country': result.get('country'),
                'state': result.get('state'),
                'latitude': result.get('lat'),
                'longitude': result.get('lon'),
                'source': 'openweathermap_reverse',
                'alternatives': data[1:] if len(data) > 1 else []
            }
            
        except requests.exceptions.RequestException as e:
            return {'error': f'Failed to get city by coordinates: {str(e)}'}
        except Exception as e:
            return {'error': f'Error processing reverse geocoding: {str(e)}'}
    
    def search_locations(self, query: str, limit: int = 5) -> Dict:
        """
        Search for locations by name.
        
        Args:
            query: Search query
            limit: Maximum number of results
            
        Returns:
            List of matching locations
        """
        try:
            url = f"{self.openweather_geo_url}/direct"
            params = {
                'q': query,
                'limit': limit,
                'appid': self.openweather_api_key
            }
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            if not data:
                return {'results': [], 'message': 'No locations found'}
            
            results = []
            for result in data:
                location = {
                    'city': result.get('name'),
                    'country': result.get('country'),
                    'state': result.get('state'),
                    'latitude': result.get('lat'),
                    'longitude': result.get('lon'),
                    'display_name': self._format_location_name(result)
                }
                results.append(location)
            
            return {
                'results': results,
                'count': len(results),
                'source': 'openweathermap'
            }
            
        except requests.exceptions.RequestException as e:
            return {'error': f'Failed to search locations: {str(e)}'}
        except Exception as e:
            return {'error': f'Error processing search: {str(e)}'}
    
    def _format_location_name(self, location_data: Dict) -> str:
        """Format location name for display"""
        parts = [location_data.get('name', '')]
        
        if location_data.get('state'):
            parts.append(location_data['state'])
        
        if location_data.get('country'):
            parts.append(location_data['country'])
        
        return ', '.join(parts)
    
    def calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
        """
        Calculate distance between two coordinates using Haversine formula.
        
        Args:
            lat1, lon1: First point coordinates
            lat2, lon2: Second point coordinates
            
        Returns:
            Distance in kilometers
        """
        import math
        
        # Convert to radians
        lat1_rad = math.radians(lat1)
        lon1_rad = math.radians(lon1)
        lat2_rad = math.radians(lat2)
        lon2_rad = math.radians(lon2)
        
        # Haversine formula
        dlat = lat2_rad - lat1_rad
        dlon = lon2_rad - lon1_rad
        
        a = (math.sin(dlat/2)**2 + 
             math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2)
        c = 2 * math.asin(math.sqrt(a))
        
        # Earth's radius in kilometers
        r = 6371
        
        return c * r
    
    def find_nearby_cities(self, lat: float, lon: float, radius_km: float = 50, limit: int = 10) -> Dict:
        """
        Find cities near a given location.
        
        Args:
            lat, lon: Center coordinates
            radius_km: Search radius in kilometers
            limit: Maximum number of results
            
        Returns:
            List of nearby cities with distances
        """
        try:
            # Get current city first
            current_city = self.get_city_by_coordinates(lat, lon, limit=1)
            
            if 'error' in current_city:
                return current_city
            
            # Search for nearby locations (this is a simplified approach)
            # In practice, you might want to use a dedicated geospatial database
            search_query = f"{current_city['city']}"
            
            search_results = self.search_locations(search_query, limit=limit * 2)
            
            if 'error' in search_results:
                return search_results
            
            # Calculate distances and filter by radius
            nearby_cities = []
            
            for location in search_results['results']:
                # Skip the exact same location
                if (abs(location['latitude'] - lat) < 0.01 and 
                    abs(location['longitude'] - lon) < 0.01):
                    continue
                
                distance = self.calculate_distance(
                    lat, lon, 
                    location['latitude'], location['longitude']
                )
                
                if distance <= radius_km:
                    location['distance_km'] = round(distance, 1)
                    nearby_cities.append(location)
            
            # Sort by distance and limit results
            nearby_cities.sort(key=lambda x: x['distance_km'])
            nearby_cities = nearby_cities[:limit]
            
            return {
                'center_location': current_city,
                'nearby_cities': nearby_cities,
                'radius_km': radius_km,
                'count': len(nearby_cities)
            }
            
        except Exception as e:
            return {'error': f'Error finding nearby cities: {str(e)}'}
    
    def get_timezone_info(self, lat: float, lon: float) -> Dict:
        """
        Get timezone information for coordinates.
        
        Args:
            lat, lon: Coordinates
            
        Returns:
            Timezone information
        """
        try:
            if not self.openweather_api_key:
                return {'error': 'OpenWeatherMap API key required'}
            
            url = f"http://api.openweathermap.org/data/2.5/weather"
            params = {
                'lat': lat,
                'lon': lon,
                'appid': self.openweather_api_key
            }
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            timezone_offset = data.get('timezone', 0)  # Offset in seconds
            
            # Convert to hours
            timezone_hours = timezone_offset / 3600
            
            # Determine timezone name (simplified)
            timezone_name = self._get_timezone_name(timezone_hours)
            
            return {
                'timezone_offset_seconds': timezone_offset,
                'timezone_offset_hours': timezone_hours,
                'timezone_name': timezone_name,
                'utc_offset': f"{'+' if timezone_hours >= 0 else ''}{timezone_hours}:00",
                'source': 'openweathermap'
            }
            
        except requests.exceptions.RequestException as e:
            return {'error': f'Failed to get timezone info: {str(e)}'}
        except Exception as e:
            return {'error': f'Error processing timezone: {str(e)}'}
    
    def _get_timezone_name(self, offset_hours: float) -> str:
        """Get timezone name from UTC offset (simplified)"""
        timezone_map = {
            -12: 'UTC-12 (Baker Island Time)',
            -11: 'UTC-11 (Niue Time)',
            -10: 'UTC-10 (Hawaii-Aleutian Standard Time)',
            -9: 'UTC-9 (Alaska Standard Time)',
            -8: 'UTC-8 (Pacific Standard Time)',
            -7: 'UTC-7 (Mountain Standard Time)',
            -6: 'UTC-6 (Central Standard Time)',
            -5: 'UTC-5 (Eastern Standard Time)',
            -4: 'UTC-4 (Atlantic Standard Time)',
            -3: 'UTC-3 (Brazil Time)',
            -2: 'UTC-2 (Fernando de Noronha Time)',
            -1: 'UTC-1 (Azores Time)',
            0: 'UTC (Greenwich Mean Time)',
            1: 'UTC+1 (Central European Time)',
            2: 'UTC+2 (Eastern European Time)',
            3: 'UTC+3 (Moscow Time)',
            4: 'UTC+4 (Gulf Standard Time)',
            5: 'UTC+5 (Pakistan Standard Time)',
            5.5: 'UTC+5:30 (India Standard Time)',
            6: 'UTC+6 (Bangladesh Standard Time)',
            7: 'UTC+7 (Indochina Time)',
            8: 'UTC+8 (China Standard Time)',
            9: 'UTC+9 (Japan Standard Time)',
            10: 'UTC+10 (Australian Eastern Time)',
            11: 'UTC+11 (Solomon Islands Time)',
            12: 'UTC+12 (New Zealand Time)',
        }
        
        # Find closest match
        closest_offset = min(timezone_map.keys(), key=lambda x: abs(x - offset_hours))
        
        if abs(closest_offset - offset_hours) < 0.5:
            return timezone_map[closest_offset]
        else:
            return f"UTC{offset_hours:+.1f}"
    
    def validate_coordinates(self, lat: float, lon: float) -> Dict:
        """
        Validate latitude and longitude coordinates.
        
        Args:
            lat: Latitude
            lon: Longitude
            
        Returns:
            Validation result
        """
        try:
            # Check ranges
            if not (-90 <= lat <= 90):
                return {'valid': False, 'error': 'Latitude must be between -90 and 90'}
            
            if not (-180 <= lon <= 180):
                return {'valid': False, 'error': 'Longitude must be between -180 and 180'}
            
            # Check data types
            if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
                return {'valid': False, 'error': 'Coordinates must be numeric'}
            
            # Check for NaN or infinity
            import math
            if math.isnan(lat) or math.isnan(lon) or math.isinf(lat) or math.isinf(lon):
                return {'valid': False, 'error': 'Invalid coordinate values'}
            
            return {
                'valid': True,
                'latitude': lat,
                'longitude': lon,
                'formatted': f"{lat:.6f}, {lon:.6f}"
            }
            
        except Exception as e:
            return {'valid': False, 'error': f'Error validating coordinates: {str(e)}'}
    
    def get_location_summary(self, ip_address: str = None) -> Dict:
        """
        Get comprehensive location summary.
        
        Args:
            ip_address: IP address (optional)
            
        Returns:
            Complete location information
        """
        try:
            # Get IP-based location
            ip_location = self.get_location_by_ip(ip_address)
            
            if 'error' in ip_location:
                return ip_location
            
            summary = {
                'ip_info': ip_location,
                'coordinates': {
                    'latitude': ip_location.get('latitude'),
                    'longitude': ip_location.get('longitude')
                },
                'address': {
                    'city': ip_location.get('city'),
                    'region': ip_location.get('region'),
                    'country': ip_location.get('country'),
                    'postal': ip_location.get('postal')
                },
                'network': {
                    'ip': ip_location.get('ip'),
                    'isp': ip_location.get('org'),
                    'timezone': ip_location.get('timezone')
                }
            }
            
            # Add timezone info if coordinates are available
            if ip_location.get('latitude') and ip_location.get('longitude'):
                timezone_info = self.get_timezone_info(
                    ip_location['latitude'], 
                    ip_location['longitude']
                )
                summary['timezone'] = timezone_info
            
            # Find nearby cities if coordinates are available
            if ip_location.get('latitude') and ip_location.get('longitude'):
                nearby = self.find_nearby_cities(
                    ip_location['latitude'], 
                    ip_location['longitude'], 
                    radius_km=25, 
                    limit=5
                )
                summary['nearby_cities'] = nearby
            
            return summary
            
        except Exception as e:
            return {'error': f'Error generating location summary: {str(e)}'}

# Utility functions for location data
def format_coordinates(lat: float, lon: float, format_type: str = 'decimal') -> str:
    """Format coordinates in different formats"""
    if format_type == 'decimal':
        return f"{lat:.6f}, {lon:.6f}"
    elif format_type == 'dms':
        return f"{_decimal_to_dms(lat, 'latitude')}, {_decimal_to_dms(lon, 'longitude')}"
    else:
        return f"{lat:.6f}, {lon:.6f}"

def _decimal_to_dms(decimal: float, coordinate_type: str) -> str:
    """Convert decimal coordinates to DMS format"""
    import math
    
    absolute = abs(decimal)
    degrees = int(absolute)
    minutes_float = (absolute - degrees) * 60
    minutes = int(minutes_float)
    seconds = (minutes_float - minutes) * 60
    
    direction = ''
    if coordinate_type == 'latitude':
        direction = 'N' if decimal >= 0 else 'S'
    else:  # longitude
        direction = 'E' if decimal >= 0 else 'W'
    
    return f"{degrees}°{minutes}'{seconds:.1f}\"{direction}"

def parse_coordinates(coord_string: str) -> Optional[Tuple[float, float]]:
    """Parse coordinate string and return (lat, lon) tuple"""
    try:
        # Remove whitespace and split
        parts = coord_string.strip().replace(' ', '').split(',')
        
        if len(parts) != 2:
            return None
        
        lat = float(parts[0])
        lon = float(parts[1])
        
        return (lat, lon)
        
    except:
        return None
564 lines•20.1 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer