Speeding up pipeline processing with change detection

Hi,
I was wondering if there is a way / has it been discussed to speed up kedro pipelines by using similar tools as e.g. docker or ansible and only rerun a node if either the code or the input data has changed. In essence, the idea would be to wrap each node call with a caching wrapper that checks if the node’s underlying code has changed or if the input datasets have been updated.

This could be very easily achieved with local data (hashing) and functions (hash of table body) and it could be an opt-in flag such as --avoid-reruns or whatever. It would be harder if the data sits on S3 or a remote location as we would have to look at the dates of the underlying file systems to make sure that the file hasn’t been updated since it was last processed.

I would however assume that the performance gains could be pretty impressive. Once the pipeline DAG has been constructed, it would skip all nodes until it hits one that has been updated. That would really aid development flow IMHO.

Any thoughts from the community?
Also hi everyone :wave:
Cheers
Pascalwhoop

5 Likes

Welcome @pascalwhoop! So glad to have you. Be sure to stop by the Hey! Introduce Yourself! topic to give us your intro; so happy you’re here :slight_smile:

This is such a great question, really. I actually did work on something similar to this, earlier this year, where there was a built in hashing function that would collect the hashes of dataset outputs as well as hash the node functions themselves. Obviously, the idea was similar to yours: to make kedro runs complete idempotent to save on compute time.

In fact, the exercise of data hashing was not so simple, even from the local side, and complexity ballooned before I realized it.

Revisiting this, I’m pretty sure this can be done rather trivially with some simple hooks. When kedro writes to a dataset, it can simply increment a monotonic id associated with that dataset. Then, when it’s about to read the dataset, it compares the previous monotonic it has stored, and if the current monotonic is greater than the stored one, of course that would trigger a rerun. It could be combined with the node function hashing we talked about earlier, and you’d then get the results you’d want:

Pipelines only running when either code or data has changed. As an added bonus, this also helps in cases when one team is sharing data, and can prevent their re-running by sharing the monotonic data storage.

What do you think?

3 Likes

To figure out whether a node’s execution can be skipped or not, we need to answer 3 questions at the very least, related to the [data, code] duality:

  • Whether the input dataset(s) has changed
  • Whether the node’s code has changed
  • Whether the output has been produced and up-to-date (define what up-to-date mean)

For the datasets, we should look into whether the versioning system we have in place can be leveraged, or can we use some sort of hashing mechanism like you mentioned, or should we build in some interface on the dataset and delegate the implementation to the dataset’s designer? Also we can potentially look into systems like DVC to see if we can learn anything from their approach.

For code, maybe we can git to track changes.

I’m also wondering if this list is exhaustive. Is there anything else we need to consider tracking at a node-level to deduce whether it needs to be re-run?

4 Likes

Something to look out for is whether or not transformation nodes have had their input and output datasets changed.

One problem with the hashing method is that it becomes complex to deal with myriad forms of data and custom io. I think the best general case is to abstract the information you do want, which is whether or not a particular dataset has been written out to.

You kick off a refresh when the raw data layer node has been run, and the data propagates accordingly.

2 Likes

Can you expand on this a bit?

  • Where is this ID stored? Locally? what about runtimes such as docker? There, locally stored values would get lost after the container exits unless you mount a path in it
  • If locally, what if a colleague touched it? Then we’d skip the node and just move on to the next but it may actually not work and we’d later discover a crash and wouldn’t be able to map it to the most recent actions
  • If remotely next to the DS, we’d have to hook into all our read/write IO tools. Probably a bad idea.
  • terraform shares a remote state which is stored in DB’s but that again introduced infrastructure overhead

I assume we might need a combination of several heuristics and err on the side of caution. So for the input datasets something like:

  • check monotonic. Higher? RERUN
  • check last edited time-stamp. Is newer? RERUN
  • local & can be hashed? Diff? RERUN

The monotonic is a super fast way to check your own history. Next, the last edited timestamp can solve remote shared DS (e.g. on S3). Avoid missing other people having tampered with the data. Local hashes are probably obsolete at this point. Because the first one will cover that.

3 Likes

This will depend on the use, but definitely if there’s going to be more than one individual running the pipeline, then this state must be shared somewhere. Even in the case of hashing the data, the “shared” state would be the data itself. :thinking:

Given the simplicity of the ID, it can really be stored in any medium: a touched file next to the target data in S3, in a collection in a remote git repo, rows in a database or key-value store, or even as a file on a shared drive, etc etc.

The difficulty of this then becomes synchronizing the update of the non-database storage mechanisms, but this is a problem for all shared state exercises and there are work-arounds.

Yep, that’s correct, but I still see this as a problem of sharing state rather than monotonic as an implementation.

DS == data source, right?

If it’s all in kedro, I don’t see a problem hooking into read/write IO; that’s a well accepted practice, to modify dataset IO :relieved:. In fact, with a bit of finagling, we wouldn’t need to change any IO at all, and I think this can be done only using the built in kedro hooks before_node_run and after_node_run, a custom parameter, and node decorators.

If we share state correctly, in what cases do we need to check timestamp or hashing on the data? :relaxed:

  • Timestamp only changes if we save a new version of the data the the storage mechanism. If we’re going to rely on the storage mechanism’s metadata, what about just relying on our own metadata? It’s cleaner and we can guarantee its correctness (timestamp is not guaranteed to be correct, depending on medium).
  • Hashing is only done when a new version of the data has been saved. If we already know we’ve saved new data, why not just store that state as metadata? Hashing will eventually become untenable as data pipelines grow, data grows, and data formats grow as well.

Hashing at the code level, though, I do think is necessary. That brings up other questions, though, on data versioning during individual development versus release, which is another bag of worms :laughing:.

How do you feel about these ideas?

1 Like

A similar discussion is emerging inside of a GH issue.

Here is Zain’s implementation of a skip-hook that skips nodes if any input is older than any output.

4 Likes

Looking for code changes could be done by tracking the ast or bytecode of the underlying function and caching it, then adding a check for code change to Zain’s hook. Checking this will only check if the actual execution of the node would change. Looking at a full diff would trigger for other things such as docstring changes.

1 Like

No need to take a look at the bytecode, just use the source code!

import inspect
inspect.getsourcelines(node._func)

looking at the ast or bytecode gets a little bit closer to what actually gets executed and would be a bit less noisy during refactoring. Tools such as black utilize the ast to ensure that your code executes exactly the same before and after autoformatting. Changing things like variable names, docstrings, spacing, indentation would not trigger a rerun unless they cause an actual change to the execution.

I did a little playing around here.

image

2 Likes

Ohhh, ok bro! That’s fair, but is it hashable and serializable to save state? :slight_smile:

Aparently I don’t quite know what I am doing?

1 Like

Haha; I also don’t know what’s going on, either. Please share the merit of your labor when you figure it out. :pray:

I played with it for a little bit and I got something that seems to check out. I looked through black a little bit and it definitely gets complex and covers more than you would need to here, so I didn’t really follow it. It is apparent that the ast changes more often than just major (2-3).

It was fun to play with but not something that I would want to support without a further understanding of it, and an extensive test suite that covered a number of platforms/versions.

The dis module has a function that yields a generator of the instruction set for the given function. It ignores docstrings, spacing, formatting, etc, but it does yield out line numbers of the instructions, so I just compared each of the other attributes.

Here was my quick and dirty test that utilized assert statements for simplicity.

  import dis


  def compare_instructions(func1, func2):
      """compatre instructions of two functions"""
      func1_instructions = list(dis.get_instructions(func1))
      func2_instructions = list(dis.get_instructions(func2))

      # compare every attribute of instructions except for starts_line
      for line1, line2 in zip(func1_instructions, func2_instructions):
          assert line1.opname == line2.opname
          assert line1.opcode == line2.opcode
          assert line1.arg == line2.arg
          assert line1.argval == line2.argval
          assert line1.argrepr == line2.argrepr
          assert line1.offset == line2.offset

      return True

I think @DataEngineerOne’s simple solution would be a much better place to start.

I’m kinda surprised that there isn’t a simpler solution already on pypi.

5 Likes

Hi. I’m sorry for my long absence, first project with the new company so I needed some time to wrap my head around that.

I started building a simple hook that keeps track of essentially two sets of data:

  • Datasets: global monotonic increasing counter
  • Nodes: Either hash of byte code or what @waylonwalker described just above.

I first wanted to store the last known write time of the datasets but then I realised this will become complicated with several systems sharing a common state file somewhere since time is hard. But a monotonic counter is universal, independent of server time. So the “state file” (taking inspiration from terraform here) would look as such

 {
  "counter": 4,
  "nodes": {
    "some.node.name": "d3b07384d113edec49eaa6238ad5ff00",
    "some.other.node": "c157a79031e1c40f85931829bc5fc552",
  },
  "datasets": {
    "source.node": 1,
    "output.node": 2,
    "output.other": 3
  }
}

Just before a node run, it would then do (pseudo):

def should_run(node, inputs, outputs, state):
  # has code changed?
  if calc_hash(node) != state[node.name]:
    return

  #is any output older than input?
  inputs_max = max(get_sets(inputs, state.inputs))
  outputs_max = max(get_sets(outputs, state.outputs))
  if inputs_max > outputs_max:
    return
  # we can cache, returning mock node
  return lambda: None

I will continue coding this in the evenings and share something once it’s a prototype. Generally, I want design it as such:

  • Hook implementation which contains comparator logic and determines to skip
  • State file utility (abstract) + implementation based on local FS storage
  • S3 + dynamoDB based state file utility implementation (possibly left open to community to implement)
  • GCS based state file utility implementation (possibly left open to community)

This way, a local execution can just store the state file locally. But distributed teams may want to share a state file for a pipeline, so they can do so using cloud tools. This is standard practice for terraform and it works well.

Thoughts? I’ll share my working branch hopefully soon. Again, forgive the delay, I got a new job to learn :grin:

2 Likes

Awesome, bro! This is looking fantastic! What are you using for the calc_hash implementation?

I’ll first try to get the __code__.co_code approach to work (looking at a function’s byte code).
It appears that fixed values don’t change the byte code but the logic would be changed (e.g. the byte code for for i in range(10) and for i in range(4) is the same). I’ll try and understand that first. Because that approach is technically very sexy. If that won’t fly, I’ll do some background research and validate what Waylonwalker built based on dis.

For now, I’ll let the crowd help out with a SO question.

Another potential benefit of this work would be that one has a register of last run nodes & their order. If this is kept in a central place, one can have some form of lineage. Because when you are in a project where you have e.g. 5 engineers run individual nodes on the same data-set in the same environment, it’s hard to know which node was based on what previous node’s run and state.

3 Likes

Aha! SO gave the hint to look at the __code__ objection more detail. There’s definitely more there that fixes the issue raised by @waylonwalker (the constants not being caught e.g. are in __code__.co_consts )

>>> dir(a.__code__)
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'co_argcount', 'co_cellvars', 'co_code', 'co_consts', 'co_filename', 'co_firstlineno', 'co_flags', 'co_freevars', 'co_kwonlyargcount', 'co_lnotab', 'co_name', 'co_names', 'co_nlocals', 'co_posonlyargcount', 'co_stacksize', 'co_varnames', 'replace']

I’ll write a number of tests to validate the hashing works based on this but I think the byte code approach will be the most minimalistic. Evaluating all the __code__.co_**** attributes for equality.

3 Likes

Alright, took the Christmas break as a chance to pick this back up.
Unfortunately, running into 2 issues which I’m happy for input on.

  1. There’s no way to overwrite writing out outputs by kedro via hooks. The hooks have no control over whether the outputs should be written or not. _call_node_run returns the outputs which are then iterated and written.

  2. The before_node_run is called in _collect_inputs_from_hook which is called after the loading of inputs. This should also be avoided since loading inputs is not needed if the node is skipped.

I made a draft PR to share the work done so far

1 Like

@DataEngineerOne @waylonwalker @limdauto

Check out this pull request with a nice figure by @pascalwhoop !

2 Likes