help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
text-classification
/
scripts
RSK World
text-classification
Text Classification Dataset - NLP + Multi-Class Classification + Machine Learning
scripts
  • __init__.py2.3 KB
  • active_learning.py26.8 KB
  • api_server.py12.7 KB
  • batch_processor.py16.4 KB
  • data_augmentation.py18.2 KB
  • data_quality.py20 KB
  • deep_learning.py24.2 KB
  • hyperparameter_tuning.py22.5 KB
  • model_explainability.py17.9 KB
  • preprocessing.py8.7 KB
  • train_classifier.py13.8 KB
  • train_transformers.py12.5 KB
  • visualizations.py19 KB
active_learning.pydata_loader.pyhyperparameter_tuning.pyvisualizations.py
scripts/active_learning.py
Raw Download
Find: Go to:
"""
================================================================================
Text Classification Dataset - Active Learning Module
================================================================================
Project: Text Classification Dataset
Category: Text Data / NLP

Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in | support@rskworld.in
Phone: +91 93305 39277

Copyright (c) 2026 RSK World - All Rights Reserved
Content used for educational purposes only.

Features:
- Uncertainty Sampling (Least Confidence, Margin, Entropy)
- Query-by-Committee
- Diversity Sampling
- Expected Model Change
- Information Density
- Batch Active Learning
- Human-in-the-loop Interface
- Learning Curve Tracking

Created: December 2026
================================================================================
"""

import numpy as np
import pandas as pd
from typing import List, Tuple, Dict, Optional, Callable, Union
from abc import ABC, abstractmethod
from collections import Counter
import warnings
from datetime import datetime

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score, f1_score
from sklearn.base import clone
import scipy.sparse as sp

# Project information
__author__ = "Molla Samser"
__website__ = "https://rskworld.in"
__email__ = "help@rskworld.in"

# Category mapping
CATEGORIES = {
    0: 'Technology', 1: 'Sports', 2: 'Politics',
    3: 'Entertainment', 4: 'Business', 5: 'Science'
}


class QueryStrategy(ABC):
    """
    Abstract base class for query strategies.
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    @abstractmethod
    def select_samples(
        self,
        X_pool: Union[np.ndarray, sp.spmatrix],
        model,
        n_samples: int = 1
    ) -> np.ndarray:
        """
        Select samples from the pool for labeling.
        
        Args:
            X_pool: Features of unlabeled samples
            model: Trained model with predict_proba
            n_samples: Number of samples to select
            
        Returns:
            Indices of selected samples
        """
        pass


class UncertaintySampling(QueryStrategy):
    """
    Uncertainty sampling selects samples where the model is least confident.
    
    Strategies:
    - least_confidence: 1 - max(P(y|x))
    - margin: P(y1|x) - P(y2|x)  (difference between top 2 classes)
    - entropy: -sum(P(y|x) * log(P(y|x)))
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    def __init__(self, strategy: str = 'entropy'):
        """
        Initialize uncertainty sampling.
        
        Args:
            strategy: One of 'least_confidence', 'margin', 'entropy'
        """
        valid_strategies = ['least_confidence', 'margin', 'entropy']
        if strategy not in valid_strategies:
            raise ValueError(f"Strategy must be one of {valid_strategies}")
        self.strategy = strategy
    
    def select_samples(
        self,
        X_pool: Union[np.ndarray, sp.spmatrix],
        model,
        n_samples: int = 1
    ) -> np.ndarray:
        """Select most uncertain samples."""
        
        # Get probability predictions
        if hasattr(model, 'predict_proba'):
            probs = model.predict_proba(X_pool)
        else:
            # For models without predict_proba (like LinearSVC)
            decision = model.decision_function(X_pool)
            # Convert to pseudo-probabilities using softmax
            exp_decision = np.exp(decision - np.max(decision, axis=1, keepdims=True))
            probs = exp_decision / exp_decision.sum(axis=1, keepdims=True)
        
        # Calculate uncertainty scores
        if self.strategy == 'least_confidence':
            scores = 1 - np.max(probs, axis=1)
        
        elif self.strategy == 'margin':
            # Sort probabilities for each sample
            sorted_probs = np.sort(probs, axis=1)
            # Margin = difference between top 2 classes
            scores = 1 - (sorted_probs[:, -1] - sorted_probs[:, -2])
        
        elif self.strategy == 'entropy':
            # Entropy = -sum(p * log(p))
            with np.errstate(divide='ignore', invalid='ignore'):
                log_probs = np.log(probs + 1e-10)
                scores = -np.sum(probs * log_probs, axis=1)
        
        # Select top n_samples with highest uncertainty
        return np.argsort(scores)[-n_samples:][::-1]


class QueryByCommittee(QueryStrategy):
    """
    Query-by-Committee uses an ensemble of models to identify
    samples where the committee disagrees most.
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    def __init__(self, n_committee: int = 5, base_estimator=None):
        """
        Initialize QBC.
        
        Args:
            n_committee: Number of committee members
            base_estimator: Base model to use (will be trained with bootstrap)
        """
        self.n_committee = n_committee
        self.base_estimator = base_estimator or LogisticRegression(max_iter=1000)
        self.committee = []
    
    def train_committee(self, X: Union[np.ndarray, sp.spmatrix], y: np.ndarray):
        """Train the committee with bootstrap sampling."""
        self.committee = []
        n_samples = X.shape[0]
        
        for _ in range(self.n_committee):
            # Bootstrap sampling
            indices = np.random.choice(n_samples, size=n_samples, replace=True)
            X_boot = X[indices] if isinstance(X, np.ndarray) else X[indices, :]
            y_boot = y[indices]
            
            # Train committee member
            member = clone(self.base_estimator)
            member.fit(X_boot, y_boot)
            self.committee.append(member)
    
    def select_samples(
        self,
        X_pool: Union[np.ndarray, sp.spmatrix],
        model=None,  # Not used, uses committee instead
        n_samples: int = 1
    ) -> np.ndarray:
        """Select samples with highest disagreement."""
        if not self.committee:
            raise ValueError("Committee not trained. Call train_committee first.")
        
        # Get predictions from each committee member
        predictions = np.array([
            member.predict(X_pool) for member in self.committee
        ])
        
        # Calculate vote entropy (disagreement)
        n_classes = len(np.unique(predictions))
        scores = np.zeros(X_pool.shape[0])
        
        for i in range(X_pool.shape[0]):
            votes = predictions[:, i]
            vote_counts = Counter(votes)
            vote_fracs = np.array([vote_counts.get(c, 0) / self.n_committee 
                                   for c in range(n_classes)])
            # Entropy of votes
            with np.errstate(divide='ignore', invalid='ignore'):
                log_fracs = np.log(vote_fracs + 1e-10)
                scores[i] = -np.sum(vote_fracs * log_fracs)
        
        return np.argsort(scores)[-n_samples:][::-1]


class DiversitySampling(QueryStrategy):
    """
    Diversity sampling selects a diverse batch of samples
    using k-means++ style selection.
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    def __init__(self, combine_with_uncertainty: bool = True):
        """
        Initialize diversity sampling.
        
        Args:
            combine_with_uncertainty: Also consider model uncertainty
        """
        self.combine_with_uncertainty = combine_with_uncertainty
    
    def select_samples(
        self,
        X_pool: Union[np.ndarray, sp.spmatrix],
        model=None,
        n_samples: int = 1
    ) -> np.ndarray:
        """Select diverse samples using k-means++ initialization."""
        
        # Convert sparse to dense if needed
        if sp.issparse(X_pool):
            X_dense = X_pool.toarray()
        else:
            X_dense = X_pool
        
        n_pool = X_dense.shape[0]
        selected = []
        
        # If combining with uncertainty, weight by uncertainty
        weights = np.ones(n_pool)
        if self.combine_with_uncertainty and model is not None:
            if hasattr(model, 'predict_proba'):
                probs = model.predict_proba(X_pool)
                weights = 1 - np.max(probs, axis=1)  # Uncertainty
        
        # Select first sample (highest uncertainty or random)
        first_idx = np.argmax(weights) if self.combine_with_uncertainty else np.random.randint(n_pool)
        selected.append(first_idx)
        
        # k-means++ style selection
        for _ in range(n_samples - 1):
            # Calculate distance to nearest selected sample
            min_distances = np.full(n_pool, np.inf)
            for sel_idx in selected:
                distances = np.linalg.norm(X_dense - X_dense[sel_idx], axis=1)
                min_distances = np.minimum(min_distances, distances)
            
            # Weight by distance and uncertainty
            selection_probs = min_distances ** 2 * weights
            selection_probs[selected] = 0  # Don't reselect
            selection_probs = selection_probs / selection_probs.sum()
            
            # Select next sample
            next_idx = np.random.choice(n_pool, p=selection_probs)
            selected.append(next_idx)
        
        return np.array(selected)


class ActiveLearner:
    """
    Main active learning class that orchestrates the learning loop.
    
    Features:
    - Multiple query strategies
    - Automatic model retraining
    - Learning curve tracking
    - Human-in-the-loop interface
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    def __init__(
        self,
        model=None,
        query_strategy: QueryStrategy = None,
        vectorizer=None,
        random_state: int = 42
    ):
        """
        Initialize active learner.
        
        Args:
            model: Classifier model
            query_strategy: Strategy for selecting samples
            vectorizer: Text vectorizer (TfidfVectorizer)
            random_state: Random seed
        """
        self.model = model or LogisticRegression(max_iter=1000, random_state=random_state)
        self.query_strategy = query_strategy or UncertaintySampling(strategy='entropy')
        self.vectorizer = vectorizer or TfidfVectorizer(max_features=10000, ngram_range=(1, 2))
        self.random_state = random_state
        np.random.seed(random_state)
        
        # Data storage
        self.X_labeled = None
        self.y_labeled = None
        self.texts_labeled = []
        self.X_pool = None
        self.pool_indices = None
        self.texts_pool = []
        
        # Tracking
        self.learning_curve = []
        self.query_history = []
        self.is_fitted = False
    
    def initialize(
        self,
        texts_pool: List[str],
        texts_initial: List[str] = None,
        labels_initial: List[int] = None,
        n_initial: int = 10
    ):
        """
        Initialize the active learner with pool and initial labeled data.
        
        Args:
            texts_pool: List of unlabeled texts
            texts_initial: Initial labeled texts
            labels_initial: Labels for initial texts
            n_initial: If no initial data, randomly select this many
        """
        print(f"\n{'='*60}")
        print("Initializing Active Learner - RSK World")
        print(f"Author: {__author__} | Website: {__website__}")
        print(f"{'='*60}\n")
        
        self.texts_pool = list(texts_pool)
        
        if texts_initial is not None and labels_initial is not None:
            # Use provided initial data
            self.texts_labeled = list(texts_initial)
            self.y_labeled = np.array(labels_initial)
        else:
            # Random initial selection
            indices = np.random.choice(len(self.texts_pool), size=n_initial, replace=False)
            self.texts_labeled = [self.texts_pool[i] for i in indices]
            # Remove from pool
            for i in sorted(indices, reverse=True):
                self.texts_pool.pop(i)
            # Placeholder labels (need to be set)
            self.y_labeled = np.zeros(n_initial, dtype=int)
            print(f"⚠️  Selected {n_initial} random samples for initial labeling.")
            print("   Please provide labels using set_initial_labels() method.")
            return
        
        # Fit vectorizer on all texts
        all_texts = self.texts_labeled + self.texts_pool
        self.vectorizer.fit(all_texts)
        
        # Transform data
        self.X_labeled = self.vectorizer.transform(self.texts_labeled)
        self.X_pool = self.vectorizer.transform(self.texts_pool)
        self.pool_indices = np.arange(len(self.texts_pool))
        
        # Initial training
        self._train_model()
        
        print(f"✓ Initialized with {len(self.texts_labeled)} labeled samples")
        print(f"✓ Pool size: {len(self.texts_pool)} unlabeled samples")
        print(f"✓ Vocabulary size: {len(self.vectorizer.vocabulary_)}")
    
    def set_initial_labels(self, labels: List[int]):
        """Set labels for initially selected samples."""
        self.y_labeled = np.array(labels)
        
        # Fit vectorizer
        all_texts = self.texts_labeled + self.texts_pool
        self.vectorizer.fit(all_texts)
        
        # Transform
        self.X_labeled = self.vectorizer.transform(self.texts_labeled)
        self.X_pool = self.vectorizer.transform(self.texts_pool)
        self.pool_indices = np.arange(len(self.texts_pool))
        
        # Train
        self._train_model()
        
        print(f"✓ Labels set. Model trained on {len(labels)} samples.")
    
    def _train_model(self):
        """Train the model on labeled data."""
        self.model.fit(self.X_labeled, self.y_labeled)
        self.is_fitted = True
    
    def query(self, n_samples: int = 1) -> List[Tuple[int, str]]:
        """
        Query for samples to label.
        
        Args:
            n_samples: Number of samples to query
            
        Returns:
            List of (index, text) tuples
        """
        if not self.is_fitted:
            raise ValueError("Model not fitted. Call initialize() first.")
        
        if len(self.texts_pool) == 0:
            print("⚠️  No more samples in pool!")
            return []
        
        n_samples = min(n_samples, len(self.texts_pool))
        
        # Get indices using query strategy
        if isinstance(self.query_strategy, QueryByCommittee):
            self.query_strategy.train_committee(self.X_labeled, self.y_labeled)
            selected_indices = self.query_strategy.select_samples(
                self.X_pool, n_samples=n_samples
            )
        else:
            selected_indices = self.query_strategy.select_samples(
                self.X_pool, self.model, n_samples=n_samples
            )
        
        # Get texts
        queries = [(int(idx), self.texts_pool[idx]) for idx in selected_indices]
        
        # Store query history
        self.query_history.append({
            'iteration': len(self.learning_curve),
            'indices': selected_indices.tolist(),
            'timestamp': datetime.now().isoformat()
        })
        
        return queries
    
    def teach(
        self,
        indices: List[int],
        labels: List[int],
        X_test: np.ndarray = None,
        y_test: np.ndarray = None
    ):
        """
        Add newly labeled samples and retrain.
        
        Args:
            indices: Indices of labeled samples (from pool)
            labels: Labels for the samples
            X_test: Optional test features for tracking
            y_test: Optional test labels for tracking
        """
        # Add to labeled set
        for idx, label in zip(indices, labels):
            self.texts_labeled.append(self.texts_pool[idx])
            
        # Update labels
        self.y_labeled = np.concatenate([self.y_labeled, np.array(labels)])
        
        # Update labeled features
        new_texts = [self.texts_pool[idx] for idx in indices]
        new_features = self.vectorizer.transform(new_texts)
        self.X_labeled = sp.vstack([self.X_labeled, new_features])
        
        # Remove from pool (in reverse order to maintain indices)
        for idx in sorted(indices, reverse=True):
            self.texts_pool.pop(idx)
        
        # Rebuild pool
        if len(self.texts_pool) > 0:
            self.X_pool = self.vectorizer.transform(self.texts_pool)
            self.pool_indices = np.arange(len(self.texts_pool))
        else:
            self.X_pool = None
            self.pool_indices = np.array([])
        
        # Retrain
        self._train_model()
        
        # Track performance
        metrics = {
            'n_labeled': len(self.texts_labeled),
            'pool_size': len(self.texts_pool),
            'train_accuracy': accuracy_score(
                self.y_labeled, 
                self.model.predict(self.X_labeled)
            )
        }
        
        if X_test is not None and y_test is not None:
            y_pred = self.model.predict(X_test)
            metrics['test_accuracy'] = accuracy_score(y_test, y_pred)
            metrics['test_f1'] = f1_score(y_test, y_pred, average='macro')
        
        self.learning_curve.append(metrics)
        
        print(f"✓ Added {len(indices)} samples. Total labeled: {len(self.texts_labeled)}")
        if 'test_accuracy' in metrics:
            print(f"  Test accuracy: {metrics['test_accuracy']:.4f}")
    
    def interactive_labeling(
        self,
        n_iterations: int = 10,
        batch_size: int = 5,
        X_test: np.ndarray = None,
        y_test: np.ndarray = None
    ):
        """
        Run interactive labeling session.
        
        Args:
            n_iterations: Number of labeling iterations
            batch_size: Samples per iteration
            X_test: Test features for tracking
            y_test: Test labels for tracking
        """
        print(f"\n{'='*60}")
        print("Interactive Labeling Session")
        print(f"Categories: {CATEGORIES}")
        print(f"{'='*60}\n")
        
        for iteration in range(n_iterations):
            print(f"\n--- Iteration {iteration + 1}/{n_iterations} ---")
            
            queries = self.query(n_samples=batch_size)
            
            if not queries:
                print("No more samples to label!")
                break
            
            labels = []
            indices = []
            
            for idx, text in queries:
                print(f"\nText: {text[:200]}...")
                print(f"Categories: {CATEGORIES}")
                
                while True:
                    try:
                        label = int(input(f"Enter label (0-{len(CATEGORIES)-1}): "))
                        if 0 <= label < len(CATEGORIES):
                            break
                        print(f"Invalid label. Must be 0-{len(CATEGORIES)-1}")
                    except ValueError:
                        print("Please enter a number.")
                
                labels.append(label)
                indices.append(idx)
            
            self.teach(indices, labels, X_test, y_test)
        
        print(f"\n{'='*60}")
        print("Labeling session complete!")
        print(f"Total labeled: {len(self.texts_labeled)}")
        print(f"{'='*60}")
    
    def simulate_labeling(
        self,
        true_labels: List[int],
        n_iterations: int = 20,
        batch_size: int = 5,
        X_test: np.ndarray = None,
        y_test: np.ndarray = None
    ) -> List[Dict]:
        """
        Simulate labeling with known ground truth.
        
        Args:
            true_labels: True labels for pool samples
            n_iterations: Number of iterations
            batch_size: Samples per iteration
            X_test: Test features
            y_test: Test labels
            
        Returns:
            Learning curve data
        """
        print(f"\n{'='*60}")
        print("Simulated Active Learning - RSK World")
        print(f"Iterations: {n_iterations}, Batch size: {batch_size}")
        print(f"{'='*60}\n")
        
        labels_pool = list(true_labels)
        
        for iteration in range(n_iterations):
            if len(self.texts_pool) == 0:
                print("Pool exhausted!")
                break
            
            queries = self.query(n_samples=batch_size)
            
            if not queries:
                break
            
            indices = [q[0] for q in queries]
            labels = [labels_pool[idx] for idx in indices]
            
            self.teach(indices, labels, X_test, y_test)
            
            # Remove used labels from pool (in reverse order)
            for idx in sorted(indices, reverse=True):
                labels_pool.pop(idx)
            
            if (iteration + 1) % 5 == 0:
                acc = self.learning_curve[-1].get('test_accuracy', 'N/A')
                print(f"Iteration {iteration + 1}: {len(self.texts_labeled)} labeled, acc={acc}")
        
        return self.learning_curve
    
    def get_learning_curve(self) -> pd.DataFrame:
        """Get learning curve as DataFrame."""
        return pd.DataFrame(self.learning_curve)
    
    def plot_learning_curve(self, save_path: str = None):
        """Plot the learning curve."""
        try:
            import matplotlib.pyplot as plt
            
            df = self.get_learning_curve()
            
            fig, ax = plt.subplots(figsize=(10, 6))
            
            ax.plot(df['n_labeled'], df['train_accuracy'], 'b-o', label='Train Accuracy')
            if 'test_accuracy' in df.columns:
                ax.plot(df['n_labeled'], df['test_accuracy'], 'r-o', label='Test Accuracy')
            
            ax.set_xlabel('Number of Labeled Samples')
            ax.set_ylabel('Accuracy')
            ax.set_title('Active Learning Curve - RSK World')
            ax.legend()
            ax.grid(True, alpha=0.3)
            
            if save_path:
                plt.savefig(save_path, dpi=150, bbox_inches='tight')
            
            plt.tight_layout()
            plt.show()
            
        except ImportError:
            print("matplotlib not available for plotting")


def compare_strategies(
    texts: List[str],
    labels: List[int],
    n_initial: int = 20,
    n_iterations: int = 15,
    batch_size: int = 5,
    test_size: float = 0.2
) -> Dict:
    """
    Compare different query strategies.
    
    Args:
        texts: All texts
        labels: All labels
        n_initial: Initial labeled samples
        n_iterations: Learning iterations
        batch_size: Samples per iteration
        test_size: Fraction for test set
        
    Returns:
        Comparison results
    """
    from sklearn.model_selection import train_test_split
    
    print(f"\n{'='*60}")
    print("Query Strategy Comparison - RSK World")
    print(f"Author: {__author__}")
    print(f"{'='*60}\n")
    
    # Split data
    texts_train, texts_test, y_train, y_test = train_test_split(
        texts, labels, test_size=test_size, random_state=42, stratify=labels
    )
    
    # Prepare test data
    vectorizer = TfidfVectorizer(max_features=10000, ngram_range=(1, 2))
    vectorizer.fit(texts_train + texts_test)
    X_test = vectorizer.transform(texts_test)
    
    strategies = {
        'Random': UncertaintySampling('least_confidence'),  # Will be random baseline
        'Least Confidence': UncertaintySampling('least_confidence'),
        'Margin': UncertaintySampling('margin'),
        'Entropy': UncertaintySampling('entropy'),
        'Diversity': DiversitySampling(combine_with_uncertainty=True)
    }
    
    results = {}
    
    for name, strategy in strategies.items():
        print(f"\nRunning {name}...")
        
        # Reset random state
        np.random.seed(42)
        
        # Initialize learner
        learner = ActiveLearner(
            query_strategy=strategy if name != 'Random' else None,
            random_state=42
        )
        
        # Random initial samples
        init_indices = np.random.choice(len(texts_train), size=n_initial, replace=False)
        texts_init = [texts_train[i] for i in init_indices]
        labels_init = [y_train[i] for i in init_indices]
        
        # Pool (remaining samples)
        pool_mask = np.ones(len(texts_train), dtype=bool)
        pool_mask[init_indices] = False
        texts_pool = [texts_train[i] for i, m in enumerate(pool_mask) if m]
        labels_pool = [y_train[i] for i, m in enumerate(pool_mask) if m]
        
        learner.initialize(texts_pool, texts_init, labels_init)
        
        # Simulate learning
        curve = learner.simulate_labeling(
            labels_pool,
            n_iterations=n_iterations,
            batch_size=batch_size,
            X_test=X_test,
            y_test=y_test
        )
        
        results[name] = learner.get_learning_curve()
    
    print(f"\n{'='*60}")
    print("Comparison Complete!")
    print(f"{'='*60}")
    
    return results


if __name__ == "__main__":
    print(f"\n{'='*60}")
    print("Active Learning Module Demo")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    # Demo with sample data
    try:
        df = pd.read_csv('../data/csv/train.csv', comment='#')
        
        texts = df['text'].tolist()
        labels = df['label'].tolist()
        
        # Simple demo
        print("Running active learning demo...")
        
        # Initialize learner
        learner = ActiveLearner(
            query_strategy=UncertaintySampling('entropy')
        )
        
        # Split data
        n_init = 10
        init_texts = texts[:n_init]
        init_labels = labels[:n_init]
        pool_texts = texts[n_init:]
        pool_labels = labels[n_init:]
        
        learner.initialize(pool_texts, init_texts, init_labels)
        
        # Simulate a few iterations
        learner.simulate_labeling(
            pool_labels,
            n_iterations=5,
            batch_size=3
        )
        
        print("\nLearning curve:")
        print(learner.get_learning_curve())
        
    except FileNotFoundError:
        print("Dataset not found. Please ensure train.csv exists.")

793 lines•26.8 KB
python
scripts/hyperparameter_tuning.py
Raw Download
Find: Go to:
"""
================================================================================
Text Classification Dataset - Advanced Hyperparameter Tuning Module
================================================================================
Project: Text Classification Dataset
Category: Text Data / NLP

Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in | support@rskworld.in
Phone: +91 93305 39277

Copyright (c) 2026 RSK World - All Rights Reserved
Content used for educational purposes only.

Features:
- GridSearchCV for exhaustive search
- RandomizedSearchCV for efficient search
- Optuna Bayesian optimization
- Cross-validation with multiple metrics
- Early stopping support
- Hyperparameter importance analysis
- Best model export

Created: December 2026
================================================================================
"""

import os
import json
import time
import warnings
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.model_selection import (
    GridSearchCV, RandomizedSearchCV, 
    StratifiedKFold, cross_val_score
)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, f1_score, make_scorer
import joblib

warnings.filterwarnings('ignore')

# Project information
__author__ = "Molla Samser"
__website__ = "https://rskworld.in"
__email__ = "help@rskworld.in"

# Category mapping
CATEGORIES = {
    0: 'Technology', 1: 'Sports', 2: 'Politics',
    3: 'Entertainment', 4: 'Business', 5: 'Science'
}


class HyperparameterTuner:
    """
    Advanced hyperparameter tuning for text classification models.
    
    Supports multiple optimization strategies:
    - Grid Search (exhaustive)
    - Random Search (efficient)
    - Bayesian Optimization (Optuna)
    
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    
    # Predefined parameter grids
    PARAM_GRIDS = {
        'logistic_regression': {
            'classifier__C': [0.01, 0.1, 1, 10, 100],
            'classifier__penalty': ['l1', 'l2'],
            'classifier__solver': ['liblinear', 'saga'],
            'classifier__max_iter': [500, 1000, 2000],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2), (1, 3)],
            'vectorizer__min_df': [1, 2, 5],
            'vectorizer__max_df': [0.9, 0.95, 1.0]
        },
        'naive_bayes': {
            'classifier__alpha': [0.001, 0.01, 0.1, 0.5, 1.0, 2.0],
            'classifier__fit_prior': [True, False],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2), (1, 3)],
            'vectorizer__min_df': [1, 2, 5],
            'vectorizer__use_idf': [True, False]
        },
        'svm': {
            'classifier__C': [0.01, 0.1, 1, 10],
            'classifier__loss': ['hinge', 'squared_hinge'],
            'classifier__max_iter': [1000, 2000, 5000],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000, 20000],
            'vectorizer__ngram_range': [(1, 1), (1, 2)],
        },
        'random_forest': {
            'classifier__n_estimators': [50, 100, 200],
            'classifier__max_depth': [10, 20, 50, None],
            'classifier__min_samples_split': [2, 5, 10],
            'classifier__min_samples_leaf': [1, 2, 4],
            'classifier__class_weight': [None, 'balanced'],
            'vectorizer__max_features': [5000, 10000],
            'vectorizer__ngram_range': [(1, 1), (1, 2)],
        }
    }
    
    def __init__(
        self,
        model_type: str = 'logistic_regression',
        cv: int = 5,
        scoring: str = 'f1_macro',
        n_jobs: int = -1,
        verbose: int = 1,
        random_state: int = 42
    ):
        """
        Initialize the tuner.
        
        Args:
            model_type: Type of model ('logistic_regression', 'naive_bayes', 'svm', 'random_forest')
            cv: Number of cross-validation folds
            scoring: Scoring metric
            n_jobs: Number of parallel jobs
            verbose: Verbosity level
            random_state: Random seed
        """
        self.model_type = model_type
        self.cv = cv
        self.scoring = scoring
        self.n_jobs = n_jobs
        self.verbose = verbose
        self.random_state = random_state
        
        self.best_params_ = None
        self.best_score_ = None
        self.best_pipeline_ = None
        self.cv_results_ = None
        self.tuning_history_ = []
    
    def _create_pipeline(self, model_type: str) -> Pipeline:
        """Create a sklearn pipeline for the specified model type."""
        vectorizer = TfidfVectorizer(stop_words='english')
        
        models = {
            'logistic_regression': LogisticRegression(random_state=self.random_state),
            'naive_bayes': MultinomialNB(),
            'svm': LinearSVC(random_state=self.random_state),
            'random_forest': RandomForestClassifier(random_state=self.random_state)
        }
        
        if model_type not in models:
            raise ValueError(f"Unknown model type: {model_type}")
        
        return Pipeline([
            ('vectorizer', vectorizer),
            ('classifier', models[model_type])
        ])
    
    def grid_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        param_grid: Optional[Dict] = None
    ) -> Dict:
        """
        Perform exhaustive grid search.
        
        Args:
            X: Training texts
            y: Training labels
            param_grid: Parameter grid (uses default if None)
            
        Returns:
            Dictionary with best parameters and score
        """
        if self.verbose:
            print(f"\n{'='*60}")
            print("Grid Search Hyperparameter Tuning")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
        
        pipeline = self._create_pipeline(self.model_type)
        param_grid = param_grid or self.PARAM_GRIDS.get(self.model_type, {})
        
        # Calculate total combinations
        total_combinations = 1
        for values in param_grid.values():
            total_combinations *= len(values)
        
        if self.verbose:
            print(f"Model Type: {self.model_type}")
            print(f"Total Combinations: {total_combinations}")
            print(f"CV Folds: {self.cv}")
            print(f"Scoring: {self.scoring}")
            print("-" * 40)
        
        start_time = time.time()
        
        grid_search = GridSearchCV(
            pipeline,
            param_grid,
            cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            return_train_score=True
        )
        
        grid_search.fit(X, y)
        
        elapsed_time = time.time() - start_time
        
        self.best_params_ = grid_search.best_params_
        self.best_score_ = grid_search.best_score_
        self.best_pipeline_ = grid_search.best_estimator_
        self.cv_results_ = grid_search.cv_results_
        
        result = {
            'method': 'grid_search',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'total_combinations': total_combinations,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def random_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        param_distributions: Optional[Dict] = None,
        n_iter: int = 50
    ) -> Dict:
        """
        Perform randomized search.
        
        Args:
            X: Training texts
            y: Training labels
            param_distributions: Parameter distributions
            n_iter: Number of iterations
            
        Returns:
            Dictionary with best parameters and score
        """
        if self.verbose:
            print(f"\n{'='*60}")
            print("Randomized Search Hyperparameter Tuning")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
        
        pipeline = self._create_pipeline(self.model_type)
        param_distributions = param_distributions or self.PARAM_GRIDS.get(self.model_type, {})
        
        if self.verbose:
            print(f"Model Type: {self.model_type}")
            print(f"Iterations: {n_iter}")
            print(f"CV Folds: {self.cv}")
            print("-" * 40)
        
        start_time = time.time()
        
        random_search = RandomizedSearchCV(
            pipeline,
            param_distributions,
            n_iter=n_iter,
            cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            random_state=self.random_state,
            return_train_score=True
        )
        
        random_search.fit(X, y)
        
        elapsed_time = time.time() - start_time
        
        self.best_params_ = random_search.best_params_
        self.best_score_ = random_search.best_score_
        self.best_pipeline_ = random_search.best_estimator_
        self.cv_results_ = random_search.cv_results_
        
        result = {
            'method': 'random_search',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'n_iterations': n_iter,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def optuna_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        n_trials: int = 100,
        timeout: Optional[int] = None
    ) -> Dict:
        """
        Perform Bayesian optimization using Optuna.
        
        Args:
            X: Training texts
            y: Training labels
            n_trials: Number of optimization trials
            timeout: Timeout in seconds
            
        Returns:
            Dictionary with best parameters and score
        """
        try:
            import optuna
            from optuna.samplers import TPESampler
        except ImportError:
            print("Optuna not installed. Install with: pip install optuna")
            return {}
        
        if self.verbose:
            print(f"\n{'='*60}")
            print("Optuna Bayesian Hyperparameter Optimization")
            print(f"Author: {__author__} | Website: {__website__}")
            print(f"{'='*60}\n")
            print(f"Model Type: {self.model_type}")
            print(f"Trials: {n_trials}")
            print("-" * 40)
        
        # Suppress Optuna logging
        optuna.logging.set_verbosity(optuna.logging.WARNING)
        
        def objective(trial):
            # Define hyperparameter search space
            if self.model_type == 'logistic_regression':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 30000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 3)),
                    'vectorizer__min_df': trial.suggest_int('min_df', 1, 10),
                    'classifier__C': trial.suggest_float('C', 0.001, 100, log=True),
                    'classifier__penalty': trial.suggest_categorical('penalty', ['l1', 'l2']),
                    'classifier__solver': 'liblinear',
                    'classifier__max_iter': trial.suggest_int('max_iter', 500, 3000, step=500),
                }
            elif self.model_type == 'naive_bayes':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 30000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 3)),
                    'classifier__alpha': trial.suggest_float('alpha', 0.001, 2.0, log=True),
                }
            elif self.model_type == 'svm':
                params = {
                    'vectorizer__max_features': trial.suggest_int('max_features', 5000, 20000, step=5000),
                    'vectorizer__ngram_range': (1, trial.suggest_int('ngram_max', 1, 2)),
                    'classifier__C': trial.suggest_float('C', 0.01, 10, log=True),
                    'classifier__max_iter': trial.suggest_int('max_iter', 1000, 5000, step=1000),
                }
            else:
                params = {}
            
            pipeline = self._create_pipeline(self.model_type)
            pipeline.set_params(**params)
            
            scores = cross_val_score(
                pipeline, X, y,
                cv=StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state),
                scoring=self.scoring,
                n_jobs=self.n_jobs
            )
            
            return scores.mean()
        
        start_time = time.time()
        
        sampler = TPESampler(seed=self.random_state)
        study = optuna.create_study(direction='maximize', sampler=sampler)
        study.optimize(objective, n_trials=n_trials, timeout=timeout, show_progress_bar=self.verbose > 0)
        
        elapsed_time = time.time() - start_time
        
        # Build best pipeline
        best_trial = study.best_trial
        self.best_params_ = best_trial.params
        self.best_score_ = best_trial.value
        
        # Reconstruct best pipeline
        pipeline = self._create_pipeline(self.model_type)
        
        if self.model_type == 'logistic_regression':
            pipeline.set_params(
                vectorizer__max_features=best_trial.params['max_features'],
                vectorizer__ngram_range=(1, best_trial.params['ngram_max']),
                vectorizer__min_df=best_trial.params['min_df'],
                classifier__C=best_trial.params['C'],
                classifier__penalty=best_trial.params['penalty'],
                classifier__solver='liblinear',
                classifier__max_iter=best_trial.params['max_iter'],
            )
        
        pipeline.fit(X, y)
        self.best_pipeline_ = pipeline
        
        result = {
            'method': 'optuna_bayesian',
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_),
            'n_trials': n_trials,
            'elapsed_time_seconds': round(elapsed_time, 2),
            'cv_folds': self.cv,
            'scoring': self.scoring,
            'optimization_history': [
                {'trial': t.number, 'score': t.value, 'params': t.params}
                for t in study.trials[:10]  # First 10 trials
            ]
        }
        
        self.tuning_history_.append(result)
        
        if self.verbose:
            print(f"\nBest Score: {self.best_score_:.4f}")
            print(f"Best Parameters:")
            for param, value in self.best_params_.items():
                print(f"  {param}: {value}")
            print(f"Time: {elapsed_time:.2f}s")
        
        return result
    
    def get_feature_importance(self, top_n: int = 20) -> Dict[str, List]:
        """
        Get feature importance from the best model.
        
        Args:
            top_n: Number of top features to return
            
        Returns:
            Dictionary with feature importance per class
        """
        if self.best_pipeline_ is None:
            raise ValueError("No model trained. Run tuning first.")
        
        vectorizer = self.best_pipeline_.named_steps['vectorizer']
        classifier = self.best_pipeline_.named_steps['classifier']
        feature_names = vectorizer.get_feature_names_out()
        
        importance = {}
        
        if hasattr(classifier, 'coef_'):
            coefs = classifier.coef_
            for i, category in CATEGORIES.items():
                if i < len(coefs):
                    top_indices = np.argsort(coefs[i])[-top_n:][::-1]
                    importance[category] = [
                        {'word': feature_names[idx], 'score': float(coefs[i][idx])}
                        for idx in top_indices
                    ]
        
        return importance
    
    def save_best_model(self, path: str):
        """Save the best model to disk."""
        if self.best_pipeline_ is None:
            raise ValueError("No model trained. Run tuning first.")
        
        model_data = {
            'pipeline': self.best_pipeline_,
            'best_params': self.best_params_,
            'best_score': self.best_score_,
            'model_type': self.model_type,
            'tuning_history': self.tuning_history_,
            'metadata': {
                'author': __author__,
                'website': __website__,
                'created_at': datetime.now().isoformat()
            }
        }
        
        joblib.dump(model_data, path)
        if self.verbose:
            print(f"Model saved to: {path}")
    
    def generate_report(self, output_path: str = 'tuning_report.json'):
        """Generate a comprehensive tuning report."""
        report = {
            'metadata': {
                'author': __author__,
                'website': __website__,
                'generated_at': datetime.now().isoformat()
            },
            'model_type': self.model_type,
            'best_params': self.best_params_,
            'best_score': float(self.best_score_) if self.best_score_ else None,
            'tuning_history': self.tuning_history_,
            'feature_importance': self.get_feature_importance() if self.best_pipeline_ else None
        }
        
        with open(output_path, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        
        if self.verbose:
            print(f"Report saved to: {output_path}")
        
        return report


def tune_all_models(
    X: np.ndarray,
    y: np.ndarray,
    method: str = 'random',
    n_iter: int = 30
) -> pd.DataFrame:
    """
    Tune multiple model types and compare results.
    
    Args:
        X: Training texts
        y: Training labels
        method: Tuning method ('grid', 'random', 'optuna')
        n_iter: Number of iterations for random/optuna
        
    Returns:
        DataFrame with comparison results
    """
    print(f"\n{'='*60}")
    print("Multi-Model Hyperparameter Tuning Comparison")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    model_types = ['logistic_regression', 'naive_bayes', 'svm']
    results = []
    
    for model_type in model_types:
        print(f"\n--- Tuning {model_type} ---")
        tuner = HyperparameterTuner(model_type=model_type, verbose=0)
        
        if method == 'grid':
            # Use smaller grid for comparison
            result = tuner.random_search(X, y, n_iter=n_iter)
        elif method == 'random':
            result = tuner.random_search(X, y, n_iter=n_iter)
        elif method == 'optuna':
            result = tuner.optuna_search(X, y, n_trials=n_iter)
        
        results.append({
            'model': model_type,
            'best_score': result['best_score'],
            'time_seconds': result['elapsed_time_seconds'],
            'best_params': str(result['best_params'])[:100] + '...'
        })
        
        print(f"  Score: {result['best_score']:.4f}")
    
    df = pd.DataFrame(results).sort_values('best_score', ascending=False)
    
    print(f"\n{'='*60}")
    print("Results Summary:")
    print(df.to_string(index=False))
    
    return df


if __name__ == "__main__":
    import re
    import string
    
    print(f"\n{'='*60}")
    print("Hyperparameter Tuning Demo - RSK World")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    # Load sample data
    try:
        train_df = pd.read_csv('../data/csv/train.csv', comment='#')
        
        # Preprocess
        def preprocess(text):
            text = text.lower()
            text = re.sub(r'https?://\S+|www\.\S+', '', text)
            text = text.translate(str.maketrans('', '', string.punctuation))
            return ' '.join(text.split())
        
        X = train_df['text'].apply(preprocess).values
        y = train_df['label'].values
        
        # Demo tuning
        tuner = HyperparameterTuner(
            model_type='logistic_regression',
            cv=3,
            verbose=1
        )
        
        # Quick random search
        result = tuner.random_search(X, y, n_iter=10)
        
        # Save model
        tuner.save_best_model('best_model.joblib')
        
        # Generate report
        tuner.generate_report('tuning_report.json')
        
        print(f"\n{'='*60}")
        print("Tuning Demo Complete!")
        print(f"Copyright (c) 2026 RSK World - All Rights Reserved")
        
    except FileNotFoundError:
        print("Dataset not found. Please ensure train.csv exists in ../data/csv/")

643 lines•22.5 KB
python
scripts/visualizations.py
Raw Download
Find: Go to:
"""
================================================================================
Text Classification Dataset - Advanced Visualization Module
================================================================================
Project: Text Classification Dataset
Category: Text Data / NLP

Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in | support@rskworld.in
Phone: +91 93305 39277

Copyright (c) 2026 RSK World - All Rights Reserved
Content used for educational purposes only.

Features:
- Word Cloud Generation
- Category Distribution Charts
- Text Length Analysis
- Confusion Matrix Heatmaps
- Training History Plots
- Feature Importance Visualization
- t-SNE Embeddings Visualization

Created: December 2026
================================================================================
"""

import os
import re
import string
from typing import List, Dict, Optional, Tuple
from collections import Counter

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

# Project information
__author__ = "Molla Samser"
__website__ = "https://rskworld.in"
__email__ = "help@rskworld.in"

# Category configuration
CATEGORIES = {
    0: 'Technology',
    1: 'Sports',
    2: 'Politics',
    3: 'Entertainment',
    4: 'Business',
    5: 'Science'
}

CATEGORY_COLORS = {
    'Technology': '#3b82f6',
    'Sports': '#22c55e',
    'Politics': '#8b5cf6',
    'Entertainment': '#ec4899',
    'Business': '#f59e0b',
    'Science': '#06b6d4'
}


def set_style():
    """Set consistent plotting style."""
    plt.style.use('seaborn-v0_8-darkgrid')
    plt.rcParams['figure.facecolor'] = '#0f0a1f'
    plt.rcParams['axes.facecolor'] = '#1a1333'
    plt.rcParams['axes.edgecolor'] = '#352d54'
    plt.rcParams['axes.labelcolor'] = '#f8fafc'
    plt.rcParams['text.color'] = '#f8fafc'
    plt.rcParams['xtick.color'] = '#a5a3b8'
    plt.rcParams['ytick.color'] = '#a5a3b8'
    plt.rcParams['grid.color'] = '#352d54'
    plt.rcParams['legend.facecolor'] = '#231d3a'
    plt.rcParams['legend.edgecolor'] = '#352d54'
    plt.rcParams['font.family'] = 'sans-serif'


def generate_wordcloud(
    texts: List[str],
    output_path: str = 'wordcloud.png',
    title: str = 'Word Cloud',
    width: int = 1200,
    height: int = 600,
    background_color: str = '#0f0a1f',
    colormap: str = 'Reds'
):
    """
    Generate word cloud from texts.
    
    Args:
        texts: List of text documents
        output_path: Path to save image
        title: Chart title
        width: Image width
        height: Image height
        background_color: Background color
        colormap: Matplotlib colormap
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    try:
        from wordcloud import WordCloud, STOPWORDS
    except ImportError:
        print("Please install wordcloud: pip install wordcloud")
        return
    
    # Combine all texts
    combined_text = ' '.join(texts)
    
    # Clean text
    combined_text = combined_text.lower()
    combined_text = re.sub(r'[^\w\s]', '', combined_text)
    
    # Generate word cloud
    wordcloud = WordCloud(
        width=width,
        height=height,
        background_color=background_color,
        colormap=colormap,
        stopwords=STOPWORDS,
        max_words=200,
        max_font_size=150,
        random_state=42
    ).generate(combined_text)
    
    # Plot
    set_style()
    fig, ax = plt.subplots(figsize=(15, 8))
    ax.imshow(wordcloud, interpolation='bilinear')
    ax.axis('off')
    ax.set_title(title, fontsize=20, fontweight='bold', color='#f8fafc', pad=20)
    
    # Add watermark
    fig.text(0.99, 0.01, 'RSK World | rskworld.in', fontsize=10, color='#6b6882',
             ha='right', va='bottom', style='italic')
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Word cloud saved to: {output_path}")


def generate_wordclouds_by_category(
    df: pd.DataFrame,
    text_column: str = 'text',
    label_column: str = 'label',
    output_dir: str = 'wordclouds'
):
    """
    Generate separate word clouds for each category.
    
    Args:
        df: DataFrame with texts and labels
        text_column: Column name for text
        label_column: Column name for labels
        output_dir: Output directory
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    os.makedirs(output_dir, exist_ok=True)
    
    for label, category in CATEGORIES.items():
        texts = df[df[label_column] == label][text_column].tolist()
        if texts:
            output_path = os.path.join(output_dir, f'wordcloud_{category.lower()}.png')
            generate_wordcloud(
                texts,
                output_path=output_path,
                title=f'{category} - Word Cloud',
                colormap='Blues' if category == 'Technology' else 
                         'Greens' if category == 'Sports' else
                         'Purples' if category == 'Politics' else
                         'RdPu' if category == 'Entertainment' else
                         'YlOrBr' if category == 'Business' else 'BuGn'
            )


def plot_category_distribution(
    df: pd.DataFrame,
    label_column: str = 'label',
    output_path: str = 'category_distribution.png'
):
    """
    Plot category distribution as pie and bar charts.
    
    Args:
        df: DataFrame with labels
        label_column: Column name for labels
        output_path: Path to save image
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    set_style()
    
    # Count categories
    counts = df[label_column].map(CATEGORIES).value_counts()
    colors = [CATEGORY_COLORS[cat] for cat in counts.index]
    
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))
    
    # Bar chart
    bars = axes[0].bar(counts.index, counts.values, color=colors, edgecolor='white', linewidth=1.5)
    axes[0].set_xlabel('Category', fontsize=12)
    axes[0].set_ylabel('Number of Documents', fontsize=12)
    axes[0].set_title('Category Distribution', fontsize=16, fontweight='bold')
    axes[0].tick_params(axis='x', rotation=45)
    
    # Add value labels
    for bar, val in zip(bars, counts.values):
        axes[0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 2,
                    str(val), ha='center', va='bottom', fontsize=11, fontweight='bold')
    
    # Pie chart
    wedges, texts, autotexts = axes[1].pie(
        counts.values,
        labels=counts.index,
        colors=colors,
        autopct='%1.1f%%',
        startangle=90,
        explode=[0.02] * len(counts),
        shadow=True
    )
    axes[1].set_title('Category Proportions', fontsize=16, fontweight='bold')
    
    for autotext in autotexts:
        autotext.set_fontsize(10)
        autotext.set_fontweight('bold')
    
    plt.suptitle('Text Classification Dataset - Category Analysis\nRSK World | rskworld.in',
                 fontsize=18, fontweight='bold', y=1.02)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Category distribution saved to: {output_path}")


def plot_text_length_distribution(
    df: pd.DataFrame,
    text_column: str = 'text',
    label_column: str = 'label',
    output_path: str = 'text_length_distribution.png'
):
    """
    Plot text length distribution by category.
    
    Args:
        df: DataFrame with texts
        text_column: Column name for text
        label_column: Column name for labels
        output_path: Path to save image
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    set_style()
    
    # Calculate lengths
    df = df.copy()
    df['word_count'] = df[text_column].str.split().str.len()
    df['char_count'] = df[text_column].str.len()
    df['category'] = df[label_column].map(CATEGORIES)
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # Word count histogram
    for cat in CATEGORIES.values():
        data = df[df['category'] == cat]['word_count']
        axes[0, 0].hist(data, bins=30, alpha=0.6, label=cat, color=CATEGORY_COLORS[cat])
    axes[0, 0].set_xlabel('Word Count', fontsize=12)
    axes[0, 0].set_ylabel('Frequency', fontsize=12)
    axes[0, 0].set_title('Word Count Distribution by Category', fontsize=14)
    axes[0, 0].legend(loc='upper right')
    
    # Character count histogram
    for cat in CATEGORIES.values():
        data = df[df['category'] == cat]['char_count']
        axes[0, 1].hist(data, bins=30, alpha=0.6, label=cat, color=CATEGORY_COLORS[cat])
    axes[0, 1].set_xlabel('Character Count', fontsize=12)
    axes[0, 1].set_ylabel('Frequency', fontsize=12)
    axes[0, 1].set_title('Character Count Distribution by Category', fontsize=14)
    axes[0, 1].legend(loc='upper right')
    
    # Box plot - word count
    colors = [CATEGORY_COLORS[CATEGORIES[i]] for i in range(6)]
    bp1 = df.boxplot(column='word_count', by='category', ax=axes[1, 0],
                     patch_artist=True, return_type='dict')
    for patch, color in zip(bp1['word_count']['boxes'], colors):
        patch.set_facecolor(color)
        patch.set_alpha(0.7)
    axes[1, 0].set_xlabel('Category', fontsize=12)
    axes[1, 0].set_ylabel('Word Count', fontsize=12)
    axes[1, 0].set_title('Word Count Box Plot', fontsize=14)
    plt.suptitle('')
    
    # Violin plot - character count
    violin_data = [df[df['category'] == cat]['char_count'].values for cat in CATEGORIES.values()]
    parts = axes[1, 1].violinplot(violin_data, positions=range(len(CATEGORIES)))
    for i, pc in enumerate(parts['bodies']):
        pc.set_facecolor(colors[i])
        pc.set_alpha(0.7)
    axes[1, 1].set_xticks(range(len(CATEGORIES)))
    axes[1, 1].set_xticklabels(CATEGORIES.values(), rotation=45)
    axes[1, 1].set_xlabel('Category', fontsize=12)
    axes[1, 1].set_ylabel('Character Count', fontsize=12)
    axes[1, 1].set_title('Character Count Violin Plot', fontsize=14)
    
    plt.suptitle('Text Length Analysis - RSK World | rskworld.in',
                 fontsize=18, fontweight='bold', y=1.02)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Text length distribution saved to: {output_path}")


def plot_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    output_path: str = 'confusion_matrix.png',
    title: str = 'Confusion Matrix',
    normalize: bool = True
):
    """
    Plot confusion matrix heatmap.
    
    Args:
        y_true: True labels
        y_pred: Predicted labels
        output_path: Path to save image
        title: Chart title
        normalize: Whether to normalize
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    from sklearn.metrics import confusion_matrix as cm_func
    
    set_style()
    
    cm = cm_func(y_true, y_pred)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    fig, ax = plt.subplots(figsize=(12, 10))
    
    sns.heatmap(
        cm,
        annot=True,
        fmt='.2%' if normalize else 'd',
        cmap='Reds',
        xticklabels=CATEGORIES.values(),
        yticklabels=CATEGORIES.values(),
        ax=ax,
        linewidths=0.5,
        linecolor='#352d54',
        cbar_kws={'label': 'Proportion' if normalize else 'Count'}
    )
    
    ax.set_xlabel('Predicted Label', fontsize=14)
    ax.set_ylabel('True Label', fontsize=14)
    ax.set_title(f'{title}\nRSK World | rskworld.in', fontsize=16, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Confusion matrix saved to: {output_path}")


def plot_training_history(
    history: Dict[str, List[float]],
    output_path: str = 'training_history.png'
):
    """
    Plot training history (loss and accuracy).
    
    Args:
        history: Dictionary with 'loss', 'val_loss', 'accuracy', 'val_accuracy'
        output_path: Path to save image
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    set_style()
    
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    epochs = range(1, len(history.get('loss', [])) + 1)
    
    # Loss plot
    if 'loss' in history:
        axes[0].plot(epochs, history['loss'], 'o-', color='#dc2626', 
                    label='Training Loss', linewidth=2, markersize=6)
    if 'val_loss' in history:
        axes[0].plot(epochs, history['val_loss'], 's--', color='#f59e0b',
                    label='Validation Loss', linewidth=2, markersize=6)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Loss', fontsize=12)
    axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Accuracy plot
    if 'accuracy' in history:
        axes[1].plot(epochs, history['accuracy'], 'o-', color='#22c55e',
                    label='Training Accuracy', linewidth=2, markersize=6)
    if 'val_accuracy' in history:
        axes[1].plot(epochs, history['val_accuracy'], 's--', color='#3b82f6',
                    label='Validation Accuracy', linewidth=2, markersize=6)
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Accuracy', fontsize=12)
    axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    axes[1].set_ylim(0, 1)
    
    plt.suptitle('Model Training History - RSK World | rskworld.in',
                 fontsize=18, fontweight='bold', y=1.02)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Training history saved to: {output_path}")


def plot_feature_importance(
    feature_names: List[str],
    importances: np.ndarray,
    top_n: int = 20,
    output_path: str = 'feature_importance.png',
    title: str = 'Top Features'
):
    """
    Plot feature importance bar chart.
    
    Args:
        feature_names: List of feature names
        importances: Feature importance scores
        top_n: Number of top features to show
        output_path: Path to save image
        title: Chart title
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    set_style()
    
    # Get top features
    indices = np.argsort(importances)[-top_n:][::-1]
    top_features = [feature_names[i] for i in indices]
    top_importances = importances[indices]
    
    fig, ax = plt.subplots(figsize=(12, 8))
    
    colors = plt.cm.Reds(np.linspace(0.4, 0.9, len(top_features)))
    bars = ax.barh(range(len(top_features)), top_importances[::-1], color=colors[::-1])
    
    ax.set_yticks(range(len(top_features)))
    ax.set_yticklabels(top_features[::-1])
    ax.set_xlabel('Importance Score', fontsize=12)
    ax.set_title(f'{title}\nRSK World | rskworld.in', fontsize=16, fontweight='bold')
    
    # Add value labels
    for bar, val in zip(bars, top_importances[::-1]):
        ax.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2,
               f'{val:.4f}', va='center', fontsize=9)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"Feature importance saved to: {output_path}")


def plot_tsne_embeddings(
    embeddings: np.ndarray,
    labels: np.ndarray,
    output_path: str = 'tsne_embeddings.png',
    perplexity: int = 30
):
    """
    Plot t-SNE visualization of text embeddings.
    
    Args:
        embeddings: Document embeddings (n_samples, n_features)
        labels: Category labels
        output_path: Path to save image
        perplexity: t-SNE perplexity parameter
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    set_style()
    
    # Reduce dimensions with t-SNE
    print("Computing t-SNE embeddings...")
    tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42, n_iter=1000)
    embeddings_2d = tsne.fit_transform(embeddings)
    
    fig, ax = plt.subplots(figsize=(14, 10))
    
    for label, category in CATEGORIES.items():
        mask = labels == label
        ax.scatter(
            embeddings_2d[mask, 0],
            embeddings_2d[mask, 1],
            c=CATEGORY_COLORS[category],
            label=category,
            alpha=0.7,
            s=50,
            edgecolors='white',
            linewidth=0.5
        )
    
    ax.set_xlabel('t-SNE Dimension 1', fontsize=12)
    ax.set_ylabel('t-SNE Dimension 2', fontsize=12)
    ax.set_title('t-SNE Visualization of Document Embeddings\nRSK World | rskworld.in',
                 fontsize=16, fontweight='bold')
    ax.legend(loc='best', framealpha=0.9)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='#0f0a1f')
    plt.close()
    
    print(f"t-SNE visualization saved to: {output_path}")


def generate_all_visualizations(data_dir: str, output_dir: str = 'visualizations'):
    """
    Generate all visualizations from the dataset.
    
    Args:
        data_dir: Path to data directory
        output_dir: Output directory for visualizations
        
    Author: Molla Samser | RSK World (https://rskworld.in)
    """
    os.makedirs(output_dir, exist_ok=True)
    
    print(f"\n{'='*60}")
    print("Generating Visualizations - RSK World")
    print(f"Author: {__author__} | Website: {__website__}")
    print(f"{'='*60}\n")
    
    # Load data
    train_df = pd.read_csv(os.path.join(data_dir, 'csv', 'train.csv'), comment='#')
    
    # Generate visualizations
    print("1. Generating category distribution...")
    plot_category_distribution(
        train_df,
        output_path=os.path.join(output_dir, 'category_distribution.png')
    )
    
    print("2. Generating text length analysis...")
    plot_text_length_distribution(
        train_df,
        output_path=os.path.join(output_dir, 'text_length_distribution.png')
    )
    
    print("3. Generating word clouds...")
    generate_wordcloud(
        train_df['text'].tolist(),
        output_path=os.path.join(output_dir, 'wordcloud_all.png'),
        title='Text Classification Dataset - All Categories'
    )
    
    generate_wordclouds_by_category(
        train_df,
        output_dir=os.path.join(output_dir, 'wordclouds_by_category')
    )
    
    print(f"\n{'='*60}")
    print("All visualizations generated successfully!")
    print(f"Output directory: {output_dir}")


if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1:
        data_dir = sys.argv[1]
    else:
        data_dir = '../data'
    
    generate_all_visualizations(data_dir)

594 lines•19 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer