help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
weather-chatbot
/
utils
RSK World
weather-chatbot
Weather Chatbot - Python + Flask + OpenWeatherMap + OpenAI + Weather Forecast + Weather Alerts + Natural Language Processing
utils
  • __pycache__
  • __init__.py3.9 KB
  • advanced_nlp.py28.4 KB
  • analytics.py22 KB
  • auth.py17.8 KB
  • comparison.py24.2 KB
  • database.py24.6 KB
  • geolocation.py20.1 KB
  • multilang.py22 KB
  • notifications.py34.1 KB
  • rate_limiting.py24.3 KB
  • weather_maps.py29.6 KB
  • weather_utils.py12.9 KB
database.py
utils/database.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Weather Chatbot Database Module
==============================

Author: RSK World (https://rskworld.in)
Founded by: Molla Samser
Designer & Tester: Rima Khatun
Contact: +91 93305 39277, hello@rskworld.in, support@rskworld.in
Location: Nutanhat, Mongolkote, Purba Burdwan, West Bengal, India, 713147
Year: 2026

Description: Database operations for weather history and user management
"""

import sqlite3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import hashlib
import secrets

class WeatherDatabase:
    """Database manager for weather chatbot"""
    
    def __init__(self, db_path: str = "weather_chatbot.db"):
        self.db_path = db_path
        self.init_database()
    
    def get_connection(self):
        """
        Get a database connection.
        
        Returns:
            sqlite3.Connection: Database connection object
        """
        return sqlite3.connect(self.db_path)
    
    def init_database(self):
        """Initialize database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Users table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT UNIQUE NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    password_hash TEXT NOT NULL,
                    api_key TEXT UNIQUE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    last_login TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1,
                    preferences TEXT
                )
            ''')
            
            # Weather history table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS weather_history (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    city TEXT NOT NULL,
                    country TEXT,
                    temperature REAL,
                    feels_like REAL,
                    humidity INTEGER,
                    pressure REAL,
                    wind_speed REAL,
                    visibility REAL,
                    description TEXT,
                    icon_code TEXT,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    data_source TEXT DEFAULT 'openweathermap',
                    raw_data TEXT
                )
            ''')
            
            # User queries table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS user_queries (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    query_text TEXT NOT NULL,
                    query_type TEXT,
                    city TEXT,
                    response_data TEXT,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Weather alerts table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS weather_alerts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    city TEXT NOT NULL,
                    country TEXT,
                    alert_type TEXT,
                    event TEXT,
                    start_time TIMESTAMP,
                    end_time TIMESTAMP,
                    description TEXT,
                    severity TEXT,
                    is_active BOOLEAN DEFAULT 1,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # User favorites table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS user_favorites (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    city TEXT NOT NULL,
                    country TEXT,
                    added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_id) REFERENCES users (id),
                    UNIQUE(user_id, city)
                )
            ''')
            
            # API usage tracking
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_usage (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    endpoint TEXT NOT NULL,
                    request_count INTEGER DEFAULT 1,
                    last_used TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    date_used DATE DEFAULT CURRENT_DATE,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Create indexes for better performance
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_weather_history_city ON weather_history(city)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_weather_history_timestamp ON weather_history(timestamp)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_queries_user ON user_queries(user_id)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_queries_timestamp ON user_queries(timestamp)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_weather_alerts_city ON weather_alerts(city)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_api_usage_date ON api_usage(date_used)')
            
            conn.commit()
    
    def create_user(self, username: str, email: str, password: str) -> Dict:
        """Create a new user account"""
        try:
            # Hash password
            password_hash = self._hash_password(password)
            api_key = self._generate_api_key()
            
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO users (username, email, password_hash, api_key)
                    VALUES (?, ?, ?, ?)
                ''', (username, email, password_hash, api_key))
                
                user_id = cursor.lastrowid
                conn.commit()
                
                return {
                    'success': True,
                    'user_id': user_id,
                    'api_key': api_key,
                    'message': 'User created successfully'
                }
                
        except sqlite3.IntegrityError as e:
            if 'username' in str(e):
                return {'success': False, 'error': 'Username already exists'}
            elif 'email' in str(e):
                return {'success': False, 'error': 'Email already exists'}
            else:
                return {'success': False, 'error': 'Database error'}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def authenticate_user(self, username: str, password: str) -> Dict:
        """Authenticate user credentials"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT id, username, email, password_hash, api_key, is_active
                    FROM users WHERE username = ? OR email = ?
                ''', (username, username))
                
                user = cursor.fetchone()
                
                if not user:
                    return {'success': False, 'error': 'User not found'}
                
                user_id, db_username, email, password_hash, api_key, is_active = user
                
                if not is_active:
                    return {'success': False, 'error': 'Account is disabled'}
                
                if self._verify_password(password, password_hash):
                    # Update last login
                    cursor.execute('''
                        UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?
                    ''', (user_id,))
                    conn.commit()
                    
                    return {
                        'success': True,
                        'user_id': user_id,
                        'username': db_username,
                        'email': email,
                        'api_key': api_key
                    }
                else:
                    return {'success': False, 'error': 'Invalid password'}
                    
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def save_weather_data(self, weather_data: Dict) -> Dict:
        """Save weather data to database"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO weather_history (
                        city, country, temperature, feels_like, humidity,
                        pressure, wind_speed, visibility, description,
                        icon_code, raw_data
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    weather_data.get('city'),
                    weather_data.get('country'),
                    weather_data.get('temperature'),
                    weather_data.get('feels_like'),
                    weather_data.get('humidity'),
                    weather_data.get('pressure'),
                    weather_data.get('wind_speed'),
                    weather_data.get('visibility'),
                    weather_data.get('description'),
                    weather_data.get('icon'),
                    json.dumps(weather_data)
                ))
                
                weather_id = cursor.lastrowid
                conn.commit()
                
                return {
                    'success': True,
                    'weather_id': weather_id,
                    'message': 'Weather data saved successfully'
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_weather_history(self, city: str, days: int = 7) -> List[Dict]:
        """Get weather history for a city"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # Calculate date threshold
                threshold_date = datetime.now() - timedelta(days=days)
                
                cursor.execute('''
                    SELECT * FROM weather_history
                    WHERE city = ? AND timestamp >= ?
                    ORDER BY timestamp DESC
                ''', (city, threshold_date.isoformat()))
                
                rows = cursor.fetchall()
                
                # Convert to list of dictionaries
                columns = [desc[0] for desc in cursor.description]
                history = []
                
                for row in rows:
                    data = dict(zip(columns, row))
                    # Parse raw data if available
                    if data.get('raw_data'):
                        try:
                            data['raw_data'] = json.loads(data['raw_data'])
                        except:
                            pass
                    history.append(data)
                
                return history
                
        except Exception as e:
            return [{'error': str(e)}]
    
    def save_user_query(self, user_id: int, query_text: str, query_type: str, 
                        city: str, response_data: Dict) -> Dict:
        """Save user query to database"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO user_queries (user_id, query_text, query_type, city, response_data)
                    VALUES (?, ?, ?, ?, ?)
                ''', (user_id, query_text, query_type, city, json.dumps(response_data)))
                
                query_id = cursor.lastrowid
                conn.commit()
                
                return {
                    'success': True,
                    'query_id': query_id,
                    'message': 'Query saved successfully'
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_user_query_history(self, user_id: int, limit: int = 50) -> List[Dict]:
        """Get user's query history"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT * FROM user_queries
                    WHERE user_id = ?
                    ORDER BY timestamp DESC
                    LIMIT ?
                ''', (user_id, limit))
                
                rows = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]
                
                history = []
                for row in rows:
                    data = dict(zip(columns, row))
                    # Parse response data
                    if data.get('response_data'):
                        try:
                            data['response_data'] = json.loads(data['response_data'])
                        except:
                            pass
                    history.append(data)
                
                return history
                
        except Exception as e:
            return [{'error': str(e)}]
    
    def add_favorite_city(self, user_id: int, city: str, country: str = None) -> Dict:
        """Add city to user's favorites"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT OR IGNORE INTO user_favorites (user_id, city, country)
                    VALUES (?, ?, ?)
                ''', (user_id, city, country))
                
                rows_affected = cursor.rowcount
                conn.commit()
                
                if rows_affected > 0:
                    return {'success': True, 'message': 'City added to favorites'}
                else:
                    return {'success': False, 'error': 'City already in favorites'}
                    
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_favorite_cities(self, user_id: int) -> List[Dict]:
        """Get user's favorite cities"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT * FROM user_favorites
                    WHERE user_id = ?
                    ORDER BY added_at DESC
                ''', (user_id,))
                
                rows = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]
                
                favorites = [dict(zip(columns, row)) for row in rows]
                return favorites
                
        except Exception as e:
            return [{'error': str(e)}]
    
    def track_api_usage(self, user_id: int, endpoint: str) -> Dict:
        """Track API usage for rate limiting and analytics"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                # Check if entry exists for today
                cursor.execute('''
                    SELECT id, request_count FROM api_usage
                    WHERE user_id = ? AND endpoint = ? AND date_used = CURRENT_DATE
                ''', (user_id, endpoint))
                
                result = cursor.fetchone()
                
                if result:
                    # Update existing entry
                    usage_id, count = result
                    cursor.execute('''
                        UPDATE api_usage
                        SET request_count = request_count + 1, last_used = CURRENT_TIMESTAMP
                        WHERE id = ?
                    ''', (usage_id,))
                else:
                    # Create new entry
                    cursor.execute('''
                        INSERT INTO api_usage (user_id, endpoint)
                        VALUES (?, ?)
                    ''', (user_id, endpoint))
                
                conn.commit()
                
                return {'success': True, 'message': 'API usage tracked'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_api_usage_stats(self, user_id: int, days: int = 30) -> Dict:
        """Get API usage statistics for a user"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                threshold_date = datetime.now() - timedelta(days=days)
                
                cursor.execute('''
                    SELECT endpoint, SUM(request_count) as total_requests,
                           COUNT(*) as active_days
                    FROM api_usage
                    WHERE user_id = ? AND date_used >= ?
                    GROUP BY endpoint
                    ORDER BY total_requests DESC
                ''', (user_id, threshold_date.date()))
                
                rows = cursor.fetchall()
                
                stats = {
                    'period_days': days,
                    'endpoints': []
                }
                
                for row in rows:
                    endpoint, total_requests, active_days = row
                    stats['endpoints'].append({
                        'endpoint': endpoint,
                        'total_requests': total_requests,
                        'active_days': active_days,
                        'avg_requests_per_day': round(total_requests / active_days, 2) if active_days > 0 else 0
                    })
                
                return stats
                
        except Exception as e:
            return {'error': str(e)}
    
    def save_weather_alert(self, alert_data: Dict) -> Dict:
        """Save weather alert to database"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO weather_alerts (
                        city, country, alert_type, event, start_time,
                        end_time, description, severity
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    alert_data.get('city'),
                    alert_data.get('country'),
                    alert_data.get('alert_type', 'weather'),
                    alert_data.get('event'),
                    alert_data.get('start'),
                    alert_data.get('end'),
                    alert_data.get('description'),
                    alert_data.get('severity', 'medium')
                ))
                
                alert_id = cursor.lastrowid
                conn.commit()
                
                return {
                    'success': True,
                    'alert_id': alert_id,
                    'message': 'Alert saved successfully'
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_active_alerts(self, city: str = None) -> List[Dict]:
        """Get active weather alerts"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                if city:
                    cursor.execute('''
                        SELECT * FROM weather_alerts
                        WHERE city = ? AND is_active = 1
                        ORDER BY created_at DESC
                    ''', (city,))
                else:
                    cursor.execute('''
                        SELECT * FROM weather_alerts
                        WHERE is_active = 1
                        ORDER BY created_at DESC
                    ''')
                
                rows = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]
                
                alerts = [dict(zip(columns, row)) for row in rows]
                return alerts
                
        except Exception as e:
            return [{'error': str(e)}]
    
    def _hash_password(self, password: str) -> str:
        """Hash password using SHA-256"""
        return hashlib.sha256(password.encode()).hexdigest()
    
    def _verify_password(self, password: str, password_hash: str) -> bool:
        """Verify password against hash"""
        return self._hash_password(password) == password_hash
    
    def _generate_api_key(self) -> str:
        """Generate secure API key"""
        return secrets.token_urlsafe(32)
    
    def get_database_stats(self) -> Dict:
        """Get database statistics"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                stats = {}
                
                # Count records in each table
                tables = ['users', 'weather_history', 'user_queries', 'weather_alerts', 'user_favorites']
                
                for table in tables:
                    cursor.execute(f'SELECT COUNT(*) FROM {table}')
                    count = cursor.fetchone()[0]
                    stats[f'{table}_count'] = count
                
                # Weather data date range
                cursor.execute('''
                    SELECT MIN(timestamp) as oldest, MAX(timestamp) as newest
                    FROM weather_history
                ''')
                date_range = cursor.fetchone()
                stats['weather_data_range'] = {
                    'oldest': date_range[0],
                    'newest': date_range[1]
                }
                
                # Most popular cities
                cursor.execute('''
                    SELECT city, COUNT(*) as query_count
                    FROM weather_history
                    GROUP BY city
                    ORDER BY query_count DESC
                    LIMIT 10
                ''')
                popular_cities = cursor.fetchall()
                stats['popular_cities'] = [{'city': city, 'count': count} for city, count in popular_cities]
                
                return stats
                
        except Exception as e:
            return {'error': str(e)}
    
    def cleanup_old_data(self, days_to_keep: int = 90) -> Dict:
        """Clean up old data to manage database size"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                
                threshold_date = datetime.now() - timedelta(days=days_to_keep)
                
                # Clean up old weather history
                cursor.execute('''
                    DELETE FROM weather_history WHERE timestamp < ?
                ''', (threshold_date.isoformat(),))
                
                weather_deleted = cursor.rowcount
                
                # Clean up old user queries
                cursor.execute('''
                    DELETE FROM user_queries WHERE timestamp < ?
                ''', (threshold_date.isoformat(),))
                
                queries_deleted = cursor.rowcount
                
                # Clean up inactive alerts
                cursor.execute('''
                    DELETE FROM weather_alerts 
                    WHERE end_time < ? OR (is_active = 0 AND created_at < ?)
                ''', (threshold_date.isoformat(), threshold_date.isoformat()))
                
                alerts_deleted = cursor.rowcount
                
                conn.commit()
                
                return {
                    'success': True,
                    'weather_records_deleted': weather_deleted,
                    'query_records_deleted': queries_deleted,
                    'alert_records_deleted': alerts_deleted,
                    'message': f'Cleaned up data older than {days_to_keep} days'
                }
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
620 lines•24.6 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer