help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
dask-parallel
/
scripts
RSK World
dask-parallel
Parallel and distributed computing with Dask
scripts
  • advanced_data_processing.py6.6 KB
  • create_basic_data.py4.9 KB
  • distributed_workflow.py4.3 KB
  • generate_advanced_data.py6.8 KB
  • memory_efficient_ops.py3.6 KB
  • parallel_processing.py2.2 KB
  • performance_profiling.py6 KB
performance_profiling.py
scripts/performance_profiling.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Performance Profiling and Optimization with Dask
Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in, support@rskworld.in
Phone: +91 93305 39277
"""

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import time
from dask.distributed import Client, LocalCluster, performance_report
from dask import delayed, compute


def profile_array_operations():
    """Profile array operations with Dask"""
    print("=" * 60)
    print("Profiling Array Operations")
    print("=" * 60)
    
    # Create cluster for profiling
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)
    
    try:
        # Generate performance report
        with performance_report(filename="array_performance.html"):
            print("\nCreating large array...")
            large_array = da.random.random((20000, 20000), chunks=(2000, 2000))
            
            print("Performing operations...")
            result = (large_array ** 2 + large_array * 2).sum()
            final = result.compute()
            
            print(f"Result: {final}")
            print("Performance report saved to array_performance.html")
    finally:
        client.close()
        cluster.close()
    print()


def profile_dataframe_operations():
    """Profile DataFrame operations with Dask"""
    print("=" * 60)
    print("Profiling DataFrame Operations")
    print("=" * 60)
    
    # Create sample data
    n_rows = 500000
    data = {
        'id': range(n_rows),
        'value': np.random.randn(n_rows),
        'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'amount': np.random.uniform(100, 10000, n_rows)
    }
    df = pd.DataFrame(data)
    df.to_csv('data/profile_data.csv', index=False)
    
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)
    
    try:
        with performance_report(filename="dataframe_performance.html"):
            print("\nLoading DataFrame...")
            ddf = dd.read_csv('data/profile_data.csv')
            
            print("Performing complex operations...")
            result = ddf.groupby('category').agg({
                'value': ['mean', 'std'],
                'amount': 'sum'
            }).compute()
            
            print(f"\nResult:\n{result}")
            print("Performance report saved to dataframe_performance.html")
    finally:
        client.close()
        cluster.close()
    print()


def benchmark_chunk_sizes():
    """Benchmark different chunk sizes"""
    print("=" * 60)
    print("Benchmarking Chunk Sizes")
    print("=" * 60)
    
    size = 10000
    chunk_sizes = [(1000, 1000), (2000, 2000), (5000, 5000)]
    
    results = []
    for chunk_size in chunk_sizes:
        print(f"\nTesting chunk size: {chunk_size}")
        start_time = time.time()
        
        array = da.random.random((size, size), chunks=chunk_size)
        result = (array ** 2).sum().compute()
        
        end_time = time.time()
        elapsed = end_time - start_time
        
        results.append((chunk_size, elapsed))
        print(f"Time: {elapsed:.2f} seconds")
    
    print("\n" + "=" * 60)
    print("Benchmark Results:")
    print("=" * 60)
    for chunk_size, elapsed in results:
        print(f"Chunk size {chunk_size}: {elapsed:.2f} seconds")
    print()


def profile_memory_usage():
    """Profile memory usage of operations"""
    print("=" * 60)
    print("Memory Usage Profiling")
    print("=" * 60)
    
    from dask.distributed import get_client
    
    cluster = LocalCluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)
    
    try:
        # Monitor memory during operations
        print("\nCreating arrays and monitoring memory...")
        
        arrays = [da.random.random((5000, 5000), chunks=(1000, 1000)) 
                 for _ in range(10)]
        
        # Perform operations
        results = [arr.sum() for arr in arrays]
        final = compute(*results)
        
        # Get memory info
        info = client.scheduler_info()
        print("\nWorker Memory Information:")
        for worker_id, worker_info in info['workers'].items():
            memory = worker_info.get('memory', {})
            print(f"Worker {worker_id}:")
            print(f"  Limit: {memory.get('limit', 'N/A')}")
            print(f"  Managed: {memory.get('managed', 'N/A')}")
    finally:
        client.close()
        cluster.close()
    print()


def optimize_computation_graph():
    """Demonstrate computation graph optimization"""
    print("=" * 60)
    print("Computation Graph Optimization")
    print("=" * 60)
    
    @delayed
    def expensive_op(x):
        time.sleep(0.1)
        return x * 2
    
    @delayed
    def combine(a, b):
        return a + b
    
    # Create computation graph
    print("\nCreating computation graph...")
    a = expensive_op(1)
    b = expensive_op(2)
    c = expensive_op(3)
    
    # Reuse intermediate results
    ab = combine(a, b)
    abc = combine(ab, c)
    
    # Visualize graph
    try:
        abc.visualize(filename="computation_graph.png", optimize_graph=True)
        print("Computation graph saved to computation_graph.png")
    except Exception as e:
        print(f"Graphviz not available: {e}")
    
    # Compute
    start_time = time.time()
    result = abc.compute()
    end_time = time.time()
    
    print(f"Result: {result}")
    print(f"Computation time: {end_time - start_time:.2f} seconds")
    print()


def main():
    """Main function"""
    print("\nDask Performance Profiling and Optimization")
    print("=" * 60)
    
    profile_array_operations()
    profile_dataframe_operations()
    benchmark_chunk_sizes()
    profile_memory_usage()
    optimize_computation_graph()
    
    print("=" * 60)
    print("All profiling completed!")
    print("=" * 60)


if __name__ == "__main__":
    main()

215 lines•6 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer