[Example] Partitioned Dataset fits distributed jobs well

I thought this would help others.

Partitioned Dataset fits distributed jobs very well. You can simply convert a single threaded job to distributed job by using Dask

Example:
Let’s say you have a partitioned data below
df_dict = {
‘patient_1’: pandas DataFrame of patient 1,
‘patient_2’: pandas DataFrame of patient 2,
.
.
}

Each dataframe is a time-series of the encounter. Each row is a point of time…

When I want to do transformation I usually do something like:

For df in dictionary:
Impute or do something of df()

The problem with that statement is it is a single thread job so for 100 of thousands of dataframes it takes forever.

Dask is the distributed version of pandas so the above code can be simply wrapped like this:

Def impute(df):
impute transformation (df)
return df

bag = db.from_sequence(list(df_dict.values()))
bag = bag.map(lambda x: impute(x))
bag_output = bag.compute()

you can convert it back to partitioned dataframe using dictionary comprehension
return {str(x): x for x in bag_output}

This was ~ 10 times faster on my 24 core computer as opposed to the for statement.

5 Likes

Interesting! Have you used Dask much with Kedro? Would be interested if you have tried out-of-memory computations and whether that plays well with the kedro workflow.

Definitely. The above code is implemented in Kedro using partitioned dataset as I noted.
I can share more detailed example if that of interest. It works remarkably well in my opinion.
I even have one chunk of code which it trigger a hidden markov algorithm using tensorflow probability running using GPU each running on one partition of the dataset. I was able to allocate not only cores but also GPU to each thread.

All what is needed is to convert the partitioned dataset into a sequence using list(df_dict.values()). Then export that to dast using dask.bag.from_sequence(). Then call the list as iterative (instead of for loop) using bag.map. you will have to initiate the partition with () similar to the callable concept provided by kedro. and then you return the output as a sequence and restructure back to dictionary. There is no comparison in the performance for large dataset.

I have not tried out of memory computations but there is no reason why it wouldn’t work.

1 Like

Oh, very interesting to hear - thanks! Do you execute your pipeline on a local Dask cluster?

I modified an impute example I used.
I don’t have a local cluster but rather a single machine with 24 core and 2 GPU.

import numpy as np
import pandas as pd
import dask.bag as db

from typing import Any, Dict, Callable

def impute_missing(enc_dict: Dict[str, Callable]) -> Dict[str, pd.DataFrame]:  
    def impute_partition(enc):  
        columns = ['col1', 'col2', 'col3']

        enc = enc()

        for col in columns:
            enc[col] = enc[col].interpolate()

        return enc
       
    bag = db.from_sequence(list(enc_dict.values()))
    bag = bag.map(lambda x: impute_partition(x))
    bag_output = bag.compute()

    parts = {'enc_' + str(int(x['enc_id'].values[0])): x for x in bag_output}

    return parts
1 Like

Thank you!