"""
================================================================================
Text Classification Dataset - Advanced Hyperparameter Tuning Module
================================================================================
Project: Text Classification Dataset
Category: Text Data / NLP

Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in | support@rskworld.in
Phone: +91 93305 39277

Copyright (c) 2026 RSK World - All Rights Reserved
Content used for educational purposes only.

Features:
- GridSearchCV for exhaustive search
- RandomizedSearchCV for efficient search
- Optuna Bayesian optimization
- Cross-validation with multiple metrics
- Early stopping support
- Hyperparameter importance analysis
- Best model export

Created: December 2026
================================================================================
"""

import os
import json
import time
import warnings
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.model_selection import (
    GridSearchCV, RandomizedSearchCV, 
    StratifiedKFold, cross_val_score
)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, f1_score, make_scorer
import joblib

warnings.filterwarnings('ignore')

# Project information
__author__ = "Molla Samser"
__website__ = "https://rskworld.in"
__email__ = "help@rskworld.in"

# Category mapping
CATEGORIES = {
    0: 'Technology', 1: 'Sports', 2: 'Politics',
    3: 'Entertainment', 4: 'Business', 5: 'Science'
}


class HyperparameterTuner:
    """
    Advanced hyperparameter tuning for text classification models.
    
    Supports multiple optimization strategies:
    - Grid Search (exhaustive)
    - Random Search (efficient)
    - Bayesian Optimization (Optuna)
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    # Predefined parameter grids
    PARAM_GRIDS = {
        'logistic_regression': {
            'classifier__C': [0.01, 0.1, 1, 10, 100],
            'classifier__penalty': ['l1', 'l2'],
            'classifier__solver': ['liblinear', 'saga'],
            'classifier__max_iter': [500, 1000, 2000],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2), (1, 3)],
            'vectorizer__min_df': [1, 2, 5],
            'vectorizer__max_df': [0.9, 0.95, 1.0]
        },
        'naive_bayes': {
            'classifier__alpha': [0.001, 0.01, 0.1, 0.5, 1.0, 2.0],
            'classifier__fit_prior': [True, False],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2), (1, 3)],
            'vectorizer__min_df': [1, 2, 5],
            'vectorizer__use_idf': [True, False]
        },
        'svm': {
            'classifier__C': [0.01, 0.1, 1, 10],
            'classifier__loss': ['hinge', 'squared_hinge'],
            'classifier__max_iter': [1000, 2000, 5000],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2)],
        },
        'random_forest': {
            'classifier__n_estimators': [50, 100, 200],
            'classifier__max_depth': [10, 20, 50, None],
            'classifier__min_samples_split': [2, 5, 10],
            'classifier__min_samples_leaf': [1, 2, 4],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000],
            'vectorizer__ngram_range': [(1, 1), (1, 2)],
        }
    }
    
    def __init__(
        self,
        model_type: str = 'logistic_regression',
        cv: int = 5,
        scoring: str = 'f1_macro',
        n_jobs: int = -1,
        verbose: int = 1,
        random_state: int = 42
    ):
        """
        Initialize the tuner.
        
        Args:
            model_type: Type of model ('logistic_regression', 'naive_bayes', 'svm', 'random_forest')
            cv: Number of cross-validation folds
            scoring: Scoring metric
            n_jobs: Number of parallel jobs
            verbose: Verbosity level
            random_state: Random seed
        """
        self.model_type = model_type
        self.cv = cv
        self.scoring = scoring
        self.n_jobs = n_jobs
        self.verbose = verbose
        self.random_state = random_state
        
        self.best_params_ = None
        self.best_score_ = None
        self.best_pipeline_ = None
        self.cv_results_ = None
        self.tuning_history_ = []
    
    def _create_pipeline(self, model_type: str) -> Pipeline:
        """Create a sklearn pipeline for the specified model type."""
        vectorizer = TfidfVectorizer(stop_words='english')
        
        models = {
            'logistic_regression': LogisticRegression(random_state=self.random_state),
            'naive_bayes': MultinomialNB(),
            'svm': LinearSVC(random_state=self.random_state),
            'random_forest': RandomForestClassifier(random_state=self.random_state)
        }
        
        if model_type not in models:
            raise ValueError(f"Unknown model type: {model_type}")
        
        return Pipeline([
            ('vectorizer', vectorizer),
            ('classifier', models[model_type])
        ])
    
    def grid_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        param_grid: Optional[Dict] = None
    ) -> Dict:
        """
        Perform exhaustive grid search.
        
        Args:
            X: Training texts
            y: Training labels
            param_grid: Parameter grid (uses default if None)
            
        Returns:
            Dictionary with best parameters and score
        """
        if self.verbose:
            print(f"\n{'='*60}")
            print("Grid Search Hyperparameter Tuning")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
        
        pipeline = self._create_pipeline(self.model_type)
        param_grid = param_grid or self.PARAM_GRIDS.get(self.model_type, {})
        
        # Calculate total combinations
        total_combinations = 1
        for values in param_grid.values():
            total_combinations *= len(values)
        
        if self.verbose:
            print(f"Model Type: {self.model_type}")
            print(f"Total Combinations: {total_combinations}")
            print(f"CV Folds: {self.cv}")
            print(f"Scoring: {self.scoring}")
            print("-" * 40)
        
        start_time = time.time()
        
        grid_search = GridSearchCV(
            pipeline,
            param_grid,
            cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            return_train_score=True
        )
        
        grid_search.fit(X, y)
        
        elapsed_time = time.time() - start_time
        
        self.best_params_ = grid_search.best_params_
        self.best_score_ = grid_search.best_score_
        self.best_pipeline_ = grid_search.best_estimator_
        self.cv_results_ = grid_search.cv_results_
        
        result = {
            'method': 'grid_search',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'total_combinations': total_combinations,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def random_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        param_distributions: Optional[Dict] = None,
        n_iter: int = 50
    ) -> Dict:
        """
        Perform randomized search.
        
        Args:
            X: Training texts
            y: Training labels
            param_distributions: Parameter distributions
            n_iter: Number of iterations
            
        Returns:
            Dictionary with best parameters and score
        """
        if self.verbose:
            print(f"\n{'='*60}")
            print("Randomized Search Hyperparameter Tuning")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
        
        pipeline = self._create_pipeline(self.model_type)
        param_distributions = param_distributions or self.PARAM_GRIDS.get(self.model_type, {})
        
        if self.verbose:
            print(f"Model Type: {self.model_type}")
            print(f"Iterations: {n_iter}")
            print(f"CV Folds: {self.cv}")
            print("-" * 40)
        
        start_time = time.time()
        
        random_search = RandomizedSearchCV(
            pipeline,
            param_distributions,
            n_iter=n_iter,
            cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            random_state=self.random_state,
            return_train_score=True
        )
        
        random_search.fit(X, y)
        
        elapsed_time = time.time() - start_time
        
        self.best_params_ = random_search.best_params_
        self.best_score_ = random_search.best_score_
        self.best_pipeline_ = random_search.best_estimator_
        self.cv_results_ = random_search.cv_results_
        
        result = {
            'method': 'random_search',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'n_iterations': n_iter,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def optuna_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        n_trials: int = 100,
        timeout: Optional[int] = None
    ) -> Dict:
        """
        Perform Bayesian optimization using Optuna.
        
        Args:
            X: Training texts
            y: Training labels
            n_trials: Number of optimization trials
            timeout: Timeout in seconds
            
        Returns:
            Dictionary with best parameters and score
        """
        try:
            import optuna
            from optuna.samplers import TPESampler
        except ImportError:
            print("Optuna not installed. Install with: pip install optuna")
            return {}
        
        if self.verbose:
            print(f"\n{'='*60}")
            print("Optuna Bayesian Hyperparameter Optimization")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
            print(f"Model Type: {self.model_type}")
            print(f"Trials: {n_trials}")
            print("-" * 40)
        
        # Suppress Optuna logging
        optuna.logging.set_verbosity(optuna.logging.WARNING)
        
        def objective(trial):
            # Define hyperparameter search space
            if self.model_type == 'logistic_regression':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 30000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 3)),
                    'vectorizer__min_df': trial.suggest_int('min_df', 1, 10),
                    'classifier__C': trial.suggest_float('C', 0.001, 100, log=True),
                    'classifier__penalty': trial.suggest_categorical('penalty', ['l1', 'l2']),
                    'classifier__solver': 'liblinear',
                    'classifier__max_iter': trial.suggest_int('max_iter', 500, 3000, step=500),
                }
            elif self.model_type == 'naive_bayes':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 30000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 3)),
                    'classifier__alpha': trial.suggest_float('alpha', 0.001, 2.0, log=True),
                }
            elif self.model_type == 'svm':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 20000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 2)),
                    'classifier__C': trial.suggest_float('C', 0.01, 10, log=True),
                    'classifier__max_iter': trial.suggest_int('max_iter', 1000, 5000, step=1000),
                }
            else:
                params = {}
            
            pipeline = self._create_pipeline(self.model_type)
            pipeline.set_params(**params)
            
            scores = cross_val_score(
                pipeline, X, y,
                cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
                scoring=self.scoring,
                n_jobs=self.n_jobs
            )
            
            return scores.mean()
        
        start_time = time.time()
        
        sampler = TPESampler(seed=self.random_state)
        study = optuna.create_study(direction='maximize', sampler=sampler)
        study.optimize(objective, n_trials=n_trials, timeout=timeout, show_progress_bar=self.verbose > 0)
        
        elapsed_time = time.time() - start_time
        
        # Build best pipeline
        best_trial = study.best_trial
        self.best_params_ = best_trial.params
        self.best_score_ = best_trial.value
        
        # Reconstruct best pipeline
        pipeline = self._create_pipeline(self.model_type)
        
        if self.model_type == 'logistic_regression':
            pipeline.set_params(
                vectorizer__max_features=best_trial.params['max_features'],
                vectorizer__ngram_range=(1, best_trial.params['ngram_max']),
                vectorizer__min_df=best_trial.params['min_df'],
                classifier__C=best_trial.params['C'],
                classifier__penalty=best_trial.params['penalty'],
                classifier__solver='liblinear',
                classifier__max_iter=best_trial.params['max_iter'],
            )
        
        pipeline.fit(X, y)
        self.best_pipeline_ = pipeline
        
        result = {
            'method': 'optuna_bayesian',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'n_trials': n_trials,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring,
            'optimization_history': [
                {'trial': t.number, 'score': t.value, 'params': t.params}
                for t in study.trials[:10]  # First 10 trials
            ]
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def get_feature_importance(self, top_n: int = 20) -> Dict[str, List]:
        """
        Get feature importance from the best model.
        
        Args:
            top_n: Number of top features to return
            
        Returns:
            Dictionary with feature importance per class
        """
        if self.best_pipeline_ is None:
            raise ValueError("No model trained. Run tuning first.")
        
        vectorizer = self.best_pipeline_.named_steps['vectorizer']
        classifier = self.best_pipeline_.named_steps['classifier']
        feature_names = vectorizer.get_feature_names_out()
        
        importance = {}
        
        if hasattr(classifier, 'coef_'):
            coefs = classifier.coef_
            for i, category in CATEGORIES.items():
                if i < len(coefs):
                    top_indices = np.argsort(coefs[i])[-top_n:][::-1]
                    importance[category] = [
                        {'word': feature_names[idx], 'score': float(coefs[i][idx])}
                        for idx in top_indices
                    ]
        
        return importance
    
    def save_best_model(self, path: str):
        """Save the best model to disk."""
        if self.best_pipeline_ is None:
            raise ValueError("No model trained. Run tuning first.")
        
        model_data = {
            'pipeline': self.best_pipeline_,
            'best_params': self.best_params_,
            'best_score': self.best_score_,
            'model_type': self.model_type,
            'tuning_history': self.tuning_history_,
            'metadata': {
                'author': __author__,
                'website': __website__,
                'created_at': datetime.now().isoformat()
            }
        }
        
        joblib.dump(model_data, path)
        if self.verbose:
            print(f"Model saved to: {path}")
    
    def generate_report(self, output_path: str = 'tuning_report.json'):
        """Generate a comprehensive tuning report."""
        report = {
            'metadata': {
                'author': __author__,
                'website': __website__,
                'generated_at': datetime.now().isoformat()
            },
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_) if self.best_score_ else None,
            'tuning_history': self.tuning_history_,
            'feature_importance': self.get_feature_importance() if self.best_pipeline_ else None
        }
        
        with open(output_path, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        if self.verbose:
            print(f"Report saved to: {output_path}")
        
        return report


def tune_all_models(
    X: np.ndarray,
    y: np.ndarray,
    method: str = 'random',
    n_iter: int = 30
) -> pd.DataFrame:
    """
    Tune multiple model types and compare results.
    
    Args:
        X: Training texts
        y: Training labels
        method: Tuning method ('grid', 'random', 'optuna')
        n_iter: Number of iterations for random/optuna
        
    Returns:
        DataFrame with comparison results
    """
    print(f"\n{'='*60}")
    print("Multi-Model Hyperparameter Tuning Comparison")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    model_types = ['logistic_regression', 'naive_bayes', 'svm']
    results = []
    
    for model_type in model_types:
        print(f"\n--- Tuning {model_type} ---")
        tuner = HyperparameterTuner(model_type=model_type, verbose=0)
        
        if method == 'grid':
            # Use smaller grid for comparison
            result = tuner.random_search(X, y, n_iter=n_iter)
        elif method == 'random':
            result = tuner.random_search(X, y, n_iter=n_iter)
        elif method == 'optuna':
            result = tuner.optuna_search(X, y, n_trials=n_iter)
        
        results.append({
            'model': model_type,
            'best_score': result['best_score'],
            'time_seconds': result['elapsed_time_seconds'],
            'best_params': str(result['best_params'])[:100] + '...'
        })
        
        print(f"  Score: {result['best_score']:.4f}")
    
    df = pd.DataFrame(results).sort_values('best_score', ascending=False)
    
    print(f"\n{'='*60}")
    print("Results Summary:")
    print(df.to_string(index=False))
    
    return df


if __name__ == "__main__":
    import re
    import string
    
    print(f"\n{'='*60}")
    print("Hyperparameter Tuning Demo - RSK World")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    # Load sample data
    try:
        train_df = pd.read_csv('../data/csv/train.csv', comment='#')
        
        # Preprocess
        def preprocess(text):
            text = text.lower()
            text = re.sub(r'https?://\S+|www\.\S+', '', text)
            text = text.translate(str.maketrans('', '', string.punctuation))
            return ' '.join(text.split())
        
        X = train_df['text'].apply(preprocess).values
        y = train_df['label'].values
        
        # Demo tuning
        tuner = HyperparameterTuner(
            model_type='logistic_regression',
            cv=3,
            verbose=1
        )
        
        # Quick random search
        result = tuner.random_search(X, y, n_iter=10)
        
        # Save model
        tuner.save_best_model('best_model.joblib')
        
        # Generate report
        tuner.generate_report('tuning_report.json')
        
        print(f"\n{'='*60}")
        print("Tuning Demo Complete!")
        print(f"Copyright (c) 2026 RSK World - All Rights Reserved")
        
    except FileNotFoundError:
        print("Dataset not found. Please ensure train.csv exists in ../data/csv/")

