help@rskworld.in +91 93305 39277
RSK World
  • Home
  • Development
    • Web Development
    • Mobile Apps
    • Software
    • Games
    • Project
  • Technologies
    • Data Science
    • AI Development
    • Cloud Development
    • Blockchain
    • Cyber Security
    • Dev Tools
    • Testing Tools
  • About
  • Contact

Theme Settings

Color Scheme
Display Options
Font Size
100%
Back to Project
RSK World
dask-parallel
/
scripts
RSK World
dask-parallel
Parallel and distributed computing with Dask
scripts
  • advanced_data_processing.py6.6 KB
  • create_basic_data.py4.9 KB
  • distributed_workflow.py4.3 KB
  • generate_advanced_data.py6.8 KB
  • memory_efficient_ops.py3.6 KB
  • parallel_processing.py2.2 KB
  • performance_profiling.py6 KB
distributed_workflow.py
scripts/distributed_workflow.py
Raw Download
Find: Go to:
#!/usr/bin/env python3
"""
Distributed Workflow with Dask
Author: Molla Samser
Designer & Tester: Rima Khatun
Website: https://rskworld.in
Email: help@rskworld.in, support@rskworld.in
Phone: +91 93305 39277
"""

from dask.distributed import Client, LocalCluster
from dask import delayed, compute
import dask.array as da
import numpy as np
import time


def setup_cluster():
    """Set up a local Dask cluster"""
    print("=" * 60)
    print("Setting Up Dask Cluster")
    print("=" * 60)
    
    # Create local cluster
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)
    
    print(f"Cluster Dashboard: {client.dashboard_link}")
    print(f"Number of workers: {len(client.scheduler_info()['workers'])}")
    print()
    
    return client, cluster


def distributed_array_processing(client):
    """Process arrays across distributed workers"""
    print("=" * 60)
    print("Distributed Array Processing")
    print("=" * 60)
    
    # Create large array
    print("\nCreating large array...")
    large_array = da.random.random((20000, 20000), chunks=(2000, 2000))
    
    print(f"Array shape: {large_array.shape}")
    print(f"Number of chunks: {large_array.numblocks}")
    
    # Process across workers
    print("\nProcessing across workers...")
    start_time = time.time()
    
    result = (large_array ** 2 + 1).sum().compute()
    
    end_time = time.time()
    
    print(f"Result: {result}")
    print(f"Computation time: {end_time - start_time:.2f} seconds")
    print()


def distributed_task_processing(client):
    """Process tasks across distributed workers"""
    print("=" * 60)
    print("Distributed Task Processing")
    print("=" * 60)
    
    @delayed
    def process_task(task_id, data_size):
        """Process a single task"""
        time.sleep(0.1)  # Simulate work
        data = np.random.rand(data_size)
        return {
            'task_id': task_id,
            'sum': data.sum(),
            'mean': data.mean()
        }
    
    # Create multiple tasks
    print("\nCreating tasks...")
    tasks = [process_task(i, 10000) for i in range(20)]
    
    # Submit to cluster
    print("Submitting tasks to cluster...")
    start_time = time.time()
    
    futures = client.compute(tasks)
    results = client.gather(futures)
    
    end_time = time.time()
    
    print(f"Processed {len(results)} tasks")
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print(f"First result: {results[0]}")
    print()


def workflow_example(client):
    """Demonstrate a complex workflow"""
    print("=" * 60)
    print("Complex Workflow Example")
    print("=" * 60)
    
    @delayed
    def load_data(source):
        """Load data from source"""
        time.sleep(0.1)
        return np.random.rand(1000)
    
    @delayed
    def process_data(data):
        """Process data"""
        time.sleep(0.2)
        return data * 2
    
    @delayed
    def combine_results(*results):
        """Combine multiple results"""
        time.sleep(0.1)
        return sum(results)
    
    # Create workflow
    print("\nCreating workflow...")
    data1 = load_data("source1")
    data2 = load_data("source2")
    data3 = load_data("source3")
    
    processed1 = process_data(data1)
    processed2 = process_data(data2)
    processed3 = process_data(data3)
    
    final = combine_results(processed1, processed2, processed3)
    
    # Execute workflow
    print("Executing workflow...")
    start_time = time.time()
    result = final.compute()
    end_time = time.time()
    
    print(f"Final result: {result}")
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print()


def main():
    """Main function"""
    print("\nDask Distributed Workflow Examples")
    print("=" * 60)
    
    client, cluster = setup_cluster()
    
    try:
        distributed_array_processing(client)
        distributed_task_processing(client)
        workflow_example(client)
    finally:
        # Cleanup
        client.close()
        cluster.close()
        print("=" * 60)
        print("Cluster closed")
        print("All examples completed!")
        print("=" * 60)


if __name__ == "__main__":
    main()

168 lines•4.3 KB
python

About RSK World

Founded by Molla Samser, with Designer & Tester Rima Khatun, RSK World is your one-stop destination for free programming resources, source code, and development tools.

Founder: Molla Samser
Designer & Tester: Rima Khatun

Development

  • Game Development
  • Web Development
  • Mobile Development
  • AI Development
  • Development Tools

Legal

  • Terms & Conditions
  • Privacy Policy
  • Disclaimer

Contact Info

Nutanhat, Mongolkote
Purba Burdwan, West Bengal
India, 713147

+91 93305 39277

hello@rskworld.in
support@rskworld.in

© 2026 RSK World. All rights reserved.

Content used for educational purposes only. View Disclaimer