help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
dask-parallel
/
scripts
RSK World
dask-parallel
Parallel and distributed computing with Dask
scripts
  • advanced_data_processing.py6.6 KB
  • create_basic_data.py4.9 KB
  • distributed_workflow.py4.3 KB
  • generate_advanced_data.py6.8 KB
  • memory_efficient_ops.py3.6 KB
  • parallel_processing.py2.2 KB
  • performance_profiling.py6 KB
advanced_data_processing.py
scripts/advanced_data_processing.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Advanced Data Processing with Dask
Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in, support@rskworld.in
Phone: +91 93305 39277
"""

import dask.dataframe as dd
import dask.array as da
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from dask import delayed, compute


def process_time_series_data():
    """Process large time series data"""
    print("=" * 60)
    print("Time Series Data Processing")
    print("=" * 60)
    
    # Generate time series data
    print("\nGenerating time series data...")
    n_points = 1000000
    dates = pd.date_range('2020-01-01', periods=n_points, freq='1H')
    
    data = {
        'timestamp': dates,
        'value': np.random.randn(n_points).cumsum(),
        'temperature': np.random.uniform(15, 35, n_points),
        'humidity': np.random.uniform(30, 90, n_points),
        'pressure': np.random.uniform(980, 1020, n_points)
    }
    
    df = pd.DataFrame(data)
    df.to_csv('data/timeseries_data.csv', index=False)
    
    # Process with Dask
    print("Processing with Dask...")
    ddf = dd.read_csv('data/timeseries_data.csv', parse_dates=['timestamp'])
    ddf = ddf.set_index('timestamp')
    
    # Resample to daily
    daily = ddf.resample('1D').agg({
        'value': 'mean',
        'temperature': ['mean', 'min', 'max'],
        'humidity': 'mean',
        'pressure': 'mean'
    }).compute()
    
    print(f"\nDaily aggregated data (first 10 days):")
    print(daily.head(10))
    print()


def process_multiple_files():
    """Process multiple CSV files in parallel"""
    print("=" * 60)
    print("Processing Multiple Files in Parallel")
    print("=" * 60)
    
    # Generate multiple data files
    print("\nGenerating multiple data files...")
    for i in range(5):
        data = {
            'id': range(i * 10000, (i + 1) * 10000),
            'value': np.random.randn(10000),
            'category': np.random.choice(['A', 'B', 'C'], 10000)
        }
        df = pd.DataFrame(data)
        df.to_csv(f'data/file_{i}.csv', index=False)
    
    # Read all files in parallel
    print("Reading files in parallel...")
    files = [f'data/file_{i}.csv' for i in range(5)]
    ddf = dd.read_csv(files)
    
    # Process
    result = ddf.groupby('category')['value'].agg(['mean', 'std', 'count']).compute()
    
    print(f"\nAggregated results:")
    print(result)
    print()


def complex_data_transformations():
    """Perform complex data transformations"""
    print("=" * 60)
    print("Complex Data Transformations")
    print("=" * 60)
    
    # Create complex dataset
    n_rows = 500000
    data = {
        'id': range(n_rows),
        'date': pd.date_range('2020-01-01', periods=n_rows, freq='1H'),
        'amount': np.random.uniform(10, 1000, n_rows),
        'category': np.random.choice(['Electronics', 'Clothing', 'Food', 'Books'], n_rows),
        'region': np.random.choice(['US', 'EU', 'Asia', 'Other'], n_rows),
        'discount': np.random.uniform(0, 0.5, n_rows)
    }
    
    df = pd.DataFrame(data)
    df['final_amount'] = df['amount'] * (1 - df['discount'])
    df.to_csv('data/complex_data.csv', index=False)
    
    # Process with Dask
    print("\nProcessing complex transformations...")
    ddf = dd.read_csv('data/complex_data.csv', parse_dates=['date'])
    
    # Multiple transformations
    ddf['year'] = ddf['date'].dt.year
    ddf['month'] = ddf['date'].dt.month
    ddf['day_of_week'] = ddf['date'].dt.dayofweek
    
    # Complex aggregations
    result = ddf.groupby(['year', 'month', 'category', 'region']).agg({
        'amount': 'sum',
        'final_amount': 'sum',
        'discount': 'mean',
        'id': 'count'
    }).compute()
    
    print(f"\nComplex aggregation result (first 20 rows):")
    print(result.head(20))
    print()


def process_nested_data():
    """Process nested/JSON-like data structures"""
    print("=" * 60)
    print("Processing Nested Data Structures")
    print("=" * 60)
    
    import json
    
    # Create nested data
    print("\nCreating nested data...")
    nested_data = []
    for i in range(10000):
        record = {
            'id': i,
            'name': f'User_{i}',
            'metadata': {
                'age': np.random.randint(18, 80),
                'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston']),
                'scores': [np.random.randint(0, 100) for _ in range(5)]
            }
        }
        nested_data.append(json.dumps(record))
    
    # Process with Dask Bag
    import dask.bag as db
    bag = db.from_sequence(nested_data, npartitions=4)
    
    def extract_info(json_str):
        data = json.loads(json_str)
        return {
            'id': data['id'],
            'name': data['name'],
            'age': data['metadata']['age'],
            'city': data['metadata']['city'],
            'avg_score': np.mean(data['metadata']['scores'])
        }
    
    processed = bag.map(extract_info)
    results = processed.compute()
    
    # Convert to DataFrame
    df = pd.DataFrame(results)
    print(f"\nProcessed nested data (first 10 rows):")
    print(df.head(10))
    print()


def streaming_data_processing():
    """Demonstrate streaming data processing"""
    print("=" * 60)
    print("Streaming Data Processing")
    print("=" * 60)
    
    @delayed
    def process_batch(batch_num, batch_size=1000):
        """Process a batch of data"""
        data = {
            'id': range(batch_num * batch_size, (batch_num + 1) * batch_size),
            'value': np.random.randn(batch_size),
            'timestamp': pd.Timestamp.now()
        }
        df = pd.DataFrame(data)
        return df['value'].sum()
    
    # Process multiple batches in parallel
    print("\nProcessing batches in parallel...")
    batches = [process_batch(i) for i in range(10)]
    
    start_time = time.time()
    results = compute(*batches)
    end_time = time.time()
    
    print(f"Processed {len(results)} batches")
    print(f"Total sum: {sum(results)}")
    print(f"Processing time: {end_time - start_time:.2f} seconds")
    print()


def main():
    """Main function"""
    print("\nAdvanced Data Processing with Dask")
    print("=" * 60)
    
    process_time_series_data()
    process_multiple_files()
    complex_data_transformations()
    process_nested_data()
    streaming_data_processing()
    
    print("=" * 60)
    print("All advanced processing completed!")
    print("=" * 60)


if __name__ == "__main__":
    main()

230 lines•6.6 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer