How to adapt dynamicallly the name of the datasets so they don't overwrite themselves when parameters change

I don’t feel too good placing it in “showcase”, I’m just a beginner. But for me this funcionality is very important, because typically I have a pipeline that depends in parameters that I want to test and change (for example, doing some kind of preprocess or another), and I want to persist all outputs in order to properly examine what happened in each case. The standard way Kedro operates, different runs will produce the same output names, so the files will be overwritten.

I will explain this using a simple example. I have a pipeline with three nodes:

def node_A(df: pd.DataFrame, params):
    for c in df.columns:
        df[c] = df[c] + params["option_A"]
    return df


def node_B(df: pd.DataFrame, params):
    for c in df.columns:
        df[c] = df[c] + params["option_B"]
    return df


def node_C(df: pd.DataFrame, params):
    for c in df.columns:
        df[c] = df[c] + params["option_C"]
    return df

Then, I expect that the parameters for each node are specified in parameters.yml in a dict-like fashion, so I have a separate place for each one of the nodes:

node_A:
    option_A: 3
node_B:
    option_B: 5
node_C:
    option_C: 10

The catalog is this one:

raw_input:
    type: pandas.CSVDataSet
    filepath: data/01_raw/input.csv
    load_args: {'index_col': False}

processed_A:
    type: pandas.CSVDataSet
    filepath: data/02_intermediate/processed_A.csv

processed_B:
    type: pandas.CSVDataSet
    filepath: data/02_intermediate/processed_B.csv

processed_C:
    type: pandas.CSVDataSet
    filepath: data/02_intermediate/processed_C.csv

And the pipeline is:

def create_pipeline(**kwargs):

    return Pipeline(
        [
            node(
                func=node_A,
                inputs=["raw_input", "params:node_A"],
                outputs= "processed_A"
            ),
            node(
                func=node_B,
                inputs=["processed_A", "params:node_B"],
                outputs= "processed_B"
            ),
            node(
                func=node_C,
                inputs=["processed_B", "params:node_C"],
                outputs= "processed_C"
            )
        ]
    )

Ok, so I have achieved this by using the Hook “before_pipeline_run”. The pseudocode is the following:

  • First of all, I expect that the parameters for each task are ordered dict-like, how I said.
  • Then, before the pipeline run, I access the catalog to update it by placing a hash before each filename. This hash is a hash that depends only on the parameters of the particular node that outputs this database, so that if the parameters change, the hash will be different. I have used the same hash that is used in Luigi.
  • The only tricky part is that for a given node, you have to consider not only the hash of its own parameters, but also the hash of the parameters of the nodes that can influence in the behaviour of the node. That is, you have to take into account the topological order of the pipeline, so that, following the example, if parameters of node_A change, this should change the hash of all outputs. If parameters of node_B change, node_A don’t need to change, but node_B and node_C yes.

The code for the hook is the following:

@hook_impl
def before_pipeline_run(
        self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
) -> None:
    """Hook to be invoked before a pipeline runs.

    Args:
        run_params: The params used to run the pipeline.
            Should be identical to the data logged by Journal with the following schema::

               {
                 "run_id": str
                 "project_path": str,
                 "env": str,
                 "kedro_version": str,
                 "tags": Optional[List[str]],
                 "from_nodes": Optional[List[str]],
                 "to_nodes": Optional[List[str]],
                 "node_names": Optional[List[str]],
                 "from_inputs": Optional[List[str]],
                 "to_outputs": Optional[List[str]],
                 "load_versions": Optional[List[str]],
                 "pipeline_name": str,
                 "extra_params": Optional[Dict[str, Any]]
               }

        pipeline: The ``Pipeline`` that will be run.
        catalog: The ``DataCatalog`` to be used during the run.
    """
    def get_params(node, parameters):
        node_name, *_ = node.name.split('(')
        return parameters[node_name]

    def get_all_dependencies(node, pipeline):
        dependencies = pipeline.node_dependencies
        parents = dependencies[node]
        ancestors = parents.copy()
        for parent in parents:
            ancestors.update(get_all_dependencies(parent, pipeline))
        return ancestors

    def hash_params(params):
        param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
        param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest()
        return param_hash


    # First we get the global parameters
    all_datasets = catalog.datasets.__dict__
    parameters = all_datasets['parameters']._data
    # Then we loop on all nodes
    for node in pipeline.nodes:
        dict_params = get_params(node, parameters)
        for ancestor_node in get_all_dependencies(node, pipeline):
            dict_params.update(get_params(ancestor_node, parameters))
        hash_node = hash_params(dict_params)
        datasets_to_update = {}
        for dataset in node.outputs:
            if dataset in all_datasets: # If not, it's a MemoryDataset
                dataset_type = all_datasets[dataset].__class__
                ds_config = {
                    'type': all_datasets[dataset].__class__,
                    'filepath': str(all_datasets[dataset]._filepath.with_name(hash_node + '_' + all_datasets[dataset]._filepath.name)),
                    'load_args': all_datasets[dataset]._load_args,
                    'save_args': all_datasets[dataset]._save_args,
                    # Look for other keys if needed
                }
                ds_instance = AbstractDataSet.from_config(dataset, ds_config)
                datasets_to_update[dataset] = ds_instance
        catalog.add_all(datasets_to_update, replace=True)

I am very interested in knowing if you find it useful, and if not, how do you manage the issue of wanting to explore different configuration of parameters… I ask it in order to learn from you, pipeline experts! :smiley:

I manually name all of my nodes, but have also thought if a hook would make it easier to consistently name them.

def create_pipeline(**kwargs):

    return Pipeline(
        [
            node(
                func=node_A,
                inputs=["raw_input", "params:node_A"],
                outputs= "processed_A",
                name='create_processed_A'
            ),
            node(
                func=node_B,
                inputs=["processed_A", "params:node_B"],
                outputs= "processed_B",
                name='create_processed_B'
            ),
            node(
                func=node_C,
                inputs=["processed_B", "params:node_C"],
                outputs= "processed_C",
                name='create_processed_C'
            )
        ]
    )

What are the names for? I tried to name my nodes also, but I got an error.

I am not sure if it’s possible with the before_pipeline_run Hook, because both pipeline and node objects are very “static”, they don’t have methods for modifying them (only for creating new objects related with the original ones).

For another Hook that I have in mind, I also find somewhat limiting. If only we had an “pipeline.add_all” like the catalog method that I used… Or a method in the nodes for renaming and modifying some properties of the node. I have not found any of these.

[EDIT] I think what you want to do can be achieved with the Hook register_pipelines, where return in a dict the pipeline’s objects that are going to be used for the project.

I have added another Hook so that the registered pipeline only runs the nodes whose output is not yet generated.

def update_catalog(pipeline, catalog):
    def get_params(node, parameters):
        node_name, *_ = node.name.split('(')
        return parameters[node_name]

    def get_all_dependencies(node, pipeline):
        dependencies = pipeline.node_dependencies
        parents = dependencies[node]
        ancestors = parents.copy()
        for parent in parents:
            ancestors.update(get_all_dependencies(parent, pipeline))
        return ancestors

    def hash_params(params):
        param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
        param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest()
        return param_hash

    # First we get the global parameters
    all_datasets = catalog.datasets.__dict__
    parameters = all_datasets['parameters']._data
    # Then we loop on all nodes
    for node in pipeline.nodes:
        dict_params = get_params(node, parameters)
        for ancestor_node in get_all_dependencies(node, pipeline):
            dict_params.update(get_params(ancestor_node, parameters))
        hash_node = hash_params(dict_params)
        datasets_to_update = {}
        for dataset in node.outputs:
            if dataset in all_datasets:  # If not, it's a MemoryDataset
                ds_config = {
                    'type': all_datasets[dataset].__class__,
                    'filepath': str(all_datasets[dataset]._filepath.with_name(
                        hash_node + '_' + all_datasets[dataset]._filepath.name)),
                    'load_args': all_datasets[dataset]._load_args,
                    'save_args': all_datasets[dataset]._save_args,
                    # Look for other keys if needed
                }
                ds_instance = AbstractDataSet.from_config(dataset, ds_config)
                datasets_to_update[dataset] = ds_instance
        catalog.add_all(datasets_to_update, replace=True)
    return


def update_pipeline(pipeline, catalog):
    only_nodes = []
    for node in pipeline.nodes:
        include_node = False
        for output in node.outputs:
            if not catalog.exists(output):
                include_node = True
                break
        if include_node:
            only_nodes.append(node.name)
    return pipeline.only_nodes(*only_nodes)


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        session = get_current_session()
        context = session.load_context()
        catalog = context.catalog
        my_pipeline = pipeline.create_pipeline()
        update_catalog(my_pipeline, catalog)
        my_pipeline = update_pipeline(my_pipeline, catalog)
        return {"__default__": my_pipeline}

As I said before, I would appreciate if someone more experienced than me could take a look and tell if this Hooks violate some of the Kedro’s assumptions or not. They work, but that’s not all…