Speeding up pipeline processing with change detection

Something to look out for is whether or not transformation nodes have had their input and output datasets changed.

One problem with the hashing method is that it becomes complex to deal with myriad forms of data and custom io. I think the best general case is to abstract the information you do want, which is whether or not a particular dataset has been written out to.

You kick off a refresh when the raw data layer node has been run, and the data propagates accordingly.

2 Likes

Can you expand on this a bit?

  • Where is this ID stored? Locally? what about runtimes such as docker? There, locally stored values would get lost after the container exits unless you mount a path in it
  • If locally, what if a colleague touched it? Then we’d skip the node and just move on to the next but it may actually not work and we’d later discover a crash and wouldn’t be able to map it to the most recent actions
  • If remotely next to the DS, we’d have to hook into all our read/write IO tools. Probably a bad idea.
  • terraform shares a remote state which is stored in DB’s but that again introduced infrastructure overhead

I assume we might need a combination of several heuristics and err on the side of caution. So for the input datasets something like:

  • check monotonic. Higher? RERUN
  • check last edited time-stamp. Is newer? RERUN
  • local & can be hashed? Diff? RERUN

The monotonic is a super fast way to check your own history. Next, the last edited timestamp can solve remote shared DS (e.g. on S3). Avoid missing other people having tampered with the data. Local hashes are probably obsolete at this point. Because the first one will cover that.

3 Likes

This will depend on the use, but definitely if there’s going to be more than one individual running the pipeline, then this state must be shared somewhere. Even in the case of hashing the data, the “shared” state would be the data itself. :thinking:

Given the simplicity of the ID, it can really be stored in any medium: a touched file next to the target data in S3, in a collection in a remote git repo, rows in a database or key-value store, or even as a file on a shared drive, etc etc.

The difficulty of this then becomes synchronizing the update of the non-database storage mechanisms, but this is a problem for all shared state exercises and there are work-arounds.

Yep, that’s correct, but I still see this as a problem of sharing state rather than monotonic as an implementation.

DS == data source, right?

If it’s all in kedro, I don’t see a problem hooking into read/write IO; that’s a well accepted practice, to modify dataset IO :relieved:. In fact, with a bit of finagling, we wouldn’t need to change any IO at all, and I think this can be done only using the built in kedro hooks before_node_run and after_node_run, a custom parameter, and node decorators.

If we share state correctly, in what cases do we need to check timestamp or hashing on the data? :relaxed:

  • Timestamp only changes if we save a new version of the data the the storage mechanism. If we’re going to rely on the storage mechanism’s metadata, what about just relying on our own metadata? It’s cleaner and we can guarantee its correctness (timestamp is not guaranteed to be correct, depending on medium).
  • Hashing is only done when a new version of the data has been saved. If we already know we’ve saved new data, why not just store that state as metadata? Hashing will eventually become untenable as data pipelines grow, data grows, and data formats grow as well.

Hashing at the code level, though, I do think is necessary. That brings up other questions, though, on data versioning during individual development versus release, which is another bag of worms :laughing:.

How do you feel about these ideas?

1 Like

A similar discussion is emerging inside of a GH issue.

Here is Zain’s implementation of a skip-hook that skips nodes if any input is older than any output.

4 Likes

Looking for code changes could be done by tracking the ast or bytecode of the underlying function and caching it, then adding a check for code change to Zain’s hook. Checking this will only check if the actual execution of the node would change. Looking at a full diff would trigger for other things such as docstring changes.

1 Like

No need to take a look at the bytecode, just use the source code!

import inspect
inspect.getsourcelines(node._func)

looking at the ast or bytecode gets a little bit closer to what actually gets executed and would be a bit less noisy during refactoring. Tools such as black utilize the ast to ensure that your code executes exactly the same before and after autoformatting. Changing things like variable names, docstrings, spacing, indentation would not trigger a rerun unless they cause an actual change to the execution.

I did a little playing around here.

image

2 Likes

Ohhh, ok bro! That’s fair, but is it hashable and serializable to save state? :slight_smile:

Aparently I don’t quite know what I am doing?

1 Like

Haha; I also don’t know what’s going on, either. Please share the merit of your labor when you figure it out. :pray:

I played with it for a little bit and I got something that seems to check out. I looked through black a little bit and it definitely gets complex and covers more than you would need to here, so I didn’t really follow it. It is apparent that the ast changes more often than just major (2-3).

It was fun to play with but not something that I would want to support without a further understanding of it, and an extensive test suite that covered a number of platforms/versions.

The dis module has a function that yields a generator of the instruction set for the given function. It ignores docstrings, spacing, formatting, etc, but it does yield out line numbers of the instructions, so I just compared each of the other attributes.

Here was my quick and dirty test that utilized assert statements for simplicity.

  import dis


  def compare_instructions(func1, func2):
      """compatre instructions of two functions"""
      func1_instructions = list(dis.get_instructions(func1))
      func2_instructions = list(dis.get_instructions(func2))

      # compare every attribute of instructions except for starts_line
      for line1, line2 in zip(func1_instructions, func2_instructions):
          assert line1.opname == line2.opname
          assert line1.opcode == line2.opcode
          assert line1.arg == line2.arg
          assert line1.argval == line2.argval
          assert line1.argrepr == line2.argrepr
          assert line1.offset == line2.offset

      return True

I think @DataEngineerOne’s simple solution would be a much better place to start.

I’m kinda surprised that there isn’t a simpler solution already on pypi.

5 Likes

Hi. I’m sorry for my long absence, first project with the new company so I needed some time to wrap my head around that.

I started building a simple hook that keeps track of essentially two sets of data:

  • Datasets: global monotonic increasing counter
  • Nodes: Either hash of byte code or what @waylonwalker described just above.

I first wanted to store the last known write time of the datasets but then I realised this will become complicated with several systems sharing a common state file somewhere since time is hard. But a monotonic counter is universal, independent of server time. So the “state file” (taking inspiration from terraform here) would look as such

 {
  "counter": 4,
  "nodes": {
    "some.node.name": "d3b07384d113edec49eaa6238ad5ff00",
    "some.other.node": "c157a79031e1c40f85931829bc5fc552",
  },
  "datasets": {
    "source.node": 1,
    "output.node": 2,
    "output.other": 3
  }
}

Just before a node run, it would then do (pseudo):

def should_run(node, inputs, outputs, state):
  # has code changed?
  if calc_hash(node) != state[node.name]:
    return

  #is any output older than input?
  inputs_max = max(get_sets(inputs, state.inputs))
  outputs_max = max(get_sets(outputs, state.outputs))
  if inputs_max > outputs_max:
    return
  # we can cache, returning mock node
  return lambda: None

I will continue coding this in the evenings and share something once it’s a prototype. Generally, I want design it as such:

  • Hook implementation which contains comparator logic and determines to skip
  • State file utility (abstract) + implementation based on local FS storage
  • S3 + dynamoDB based state file utility implementation (possibly left open to community to implement)
  • GCS based state file utility implementation (possibly left open to community)

This way, a local execution can just store the state file locally. But distributed teams may want to share a state file for a pipeline, so they can do so using cloud tools. This is standard practice for terraform and it works well.

Thoughts? I’ll share my working branch hopefully soon. Again, forgive the delay, I got a new job to learn :grin:

2 Likes

Awesome, bro! This is looking fantastic! What are you using for the calc_hash implementation?

I’ll first try to get the __code__.co_code approach to work (looking at a function’s byte code).
It appears that fixed values don’t change the byte code but the logic would be changed (e.g. the byte code for for i in range(10) and for i in range(4) is the same). I’ll try and understand that first. Because that approach is technically very sexy. If that won’t fly, I’ll do some background research and validate what Waylonwalker built based on dis.

For now, I’ll let the crowd help out with a SO question.

Another potential benefit of this work would be that one has a register of last run nodes & their order. If this is kept in a central place, one can have some form of lineage. Because when you are in a project where you have e.g. 5 engineers run individual nodes on the same data-set in the same environment, it’s hard to know which node was based on what previous node’s run and state.

3 Likes

Aha! SO gave the hint to look at the __code__ objection more detail. There’s definitely more there that fixes the issue raised by @waylonwalker (the constants not being caught e.g. are in __code__.co_consts )

>>> dir(a.__code__)
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'co_argcount', 'co_cellvars', 'co_code', 'co_consts', 'co_filename', 'co_firstlineno', 'co_flags', 'co_freevars', 'co_kwonlyargcount', 'co_lnotab', 'co_name', 'co_names', 'co_nlocals', 'co_posonlyargcount', 'co_stacksize', 'co_varnames', 'replace']

I’ll write a number of tests to validate the hashing works based on this but I think the byte code approach will be the most minimalistic. Evaluating all the __code__.co_**** attributes for equality.

3 Likes

Alright, took the Christmas break as a chance to pick this back up.
Unfortunately, running into 2 issues which I’m happy for input on.

  1. There’s no way to overwrite writing out outputs by kedro via hooks. The hooks have no control over whether the outputs should be written or not. _call_node_run returns the outputs which are then iterated and written.

  2. The before_node_run is called in _collect_inputs_from_hook which is called after the loading of inputs. This should also be avoided since loading inputs is not needed if the node is skipped.

I made a draft PR to share the work done so far

1 Like

@DataEngineerOne @waylonwalker @limdauto

Check out this pull request with a nice figure by @pascalwhoop !

2 Likes

I think this is a very useful Hook. I tried my own implementation here, by doing it in the register_pipelines Hook: before registring the pipeline, I look for the nodes whose output is generated and remove them from the pipeline. But this is even better, because someone might want to do it dynamically, depending on things that are known on the run. And also it detects changes on inputs and code…

@Minyus thanks for noticing :slight_smile:
It appears that this isn’t really in line with the kedro design principles though. They do not want hooks to change the execution flow of the DAG. So rather, they’d prefer if this was either at Runner level (a separate runner implementation) or at Session level. I spoke to Ivan separately on this and he told me that the required changes to the codebase (allowing hooks to signal skipping) won’t be implemented.

So the hook, while “easy”, won’t happen. I understand the rationale, hooks aren’t supposed to change the flow, they are just supposed to be informed that something happened.

Looking at the runner, I could imagine a mixin style implementation where we allow a NodeSkippableMixin to be inherited from in a multiple-inheritance model for the runners.

So you’d have

class SkippingThreadRunner(AbstractRunner, NodeSkippableMixin, ThreadedRunnerMixin):
...

which could then be a multithreaded runner that can skip nodes. Which would be the ultimate speed boost.

Thoughts?

@pascalwhoop

I’m guessing that Runner-level implementation would need quite a lot of efforts for both developing and preparing sufficient testing code, but I wouldn’t stop you if you are willing.
If Runner-level is too low and Session-level is too high, Context-level might be appropriate.

I have exposed run_only_missing (Skip each node if the output data exists) option of Runner to parameters.yml using a customized Context in PipelineX package (shown below), which could be a starting point for Context-level implementation.