Using CachedDataSet and ParallelRunner together fails

Can anyone help to make CachedDataSet and ParallelRunner together work in Kedro 0.17.0?

I’m wondering how to solve this issue:

What is the goal you are trying to achieve?
Run the pipeline faster while saving intermediate files.

What have you tried, in order to accomplish the goal?
Tried CachedDataSet and ParallelRunner together as follows.

  1. Run kedro new --starter=pandas-iris and generate the Kedro project.
  2. Add the following to conf/base/catalog.yml.
example_model:
  type: CachedDataSet
  dataset: 
    type: pickle.PickleDataSet
    filepath: /tmp/model.pkl
  1. Run kedro run -p

What version of Kedro are you using? (Use kedro -V)
Kedro 0.17.0

Do you have any custom plugins?
No

What is the full stack trace of the error (if applicable)

2021-01-18 05:33:15,557 - kedro.pipeline.node - ERROR - Node `predict: predict([example_model,example_test_x]) -> [example_predictions]` failed with error: 
unsupported operand type(s) for *: 'float' and 'object'
2021-01-18 05:33:15,566 - kedro.runner.parallel_runner - WARNING - There are 2 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
  --from-nodes "predict,report"
2021-01-18 05:33:15,587 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/parallel_runner.py", line 145, in _run_node_synchronization
    return run_node(node, catalog, is_async, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 212, in run_node
    node = _run_node_sequential(node, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 297, in _run_node_sequential
    outputs = _call_node_run(node, catalog, inputs, is_async, run_id=run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 265, in _call_node_run
    raise exc
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 255, in _call_node_run
    outputs = node.run(inputs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 466, in run
    raise exc
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 459, in run
    outputs = self._run_with_dict(inputs, self._inputs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 520, in _run_with_dict
    return self._decorated_func(**kwargs)
  File "/data/kedro_proj/pi/src/pi/pipelines/data_science/nodes.py", line 88, in predict
    result = _sigmoid(np.dot(X, model))
  File "<__array_function__ internals>", line 6, in dot
TypeError: unsupported operand type(s) for *: 'float' and 'object'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.7/site-packages/kedro/framework/cli/cli.py", line 696, in main
    cli_collection(**cli_context)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/data/kedro_proj/pi/src/pi/cli.py", line 236, in run
    pipeline_name=pipeline,
  File "/opt/conda/lib/python3.7/site-packages/kedro/framework/session/session.py", line 414, in run
    run_result = runner.run(filtered_pipeline, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 100, in run
    self._run(pipeline, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/parallel_runner.py", line 350, in _run
    node = future.result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
TypeError: unsupported operand type(s) for *: 'float' and 'object'
1 Like

Instead of ParallelRunner, have you tried doing this with the threaded version of ParallelRunner?

Seems like your node is trying to call a multiplication function on an object that gets returned. It’s possible the ParallelRunner or CachedDataSet is wrapping either the X or model variables in a strange way.

No, I just tried kedro run -p (same as kedro run --parallel).
Ok, I’ll try kedro run --runner ThreadRunner .
Thanks!

1 Like