datachain icon indicating copy to clipboard operation
datachain copied to clipboard

UDF checkpoints

Open ilongin opened this issue 2 months ago • 2 comments

Checkpoints for UDF are needed to save and reuse partial UDF data between job runs if UDF fails. Important: user can change UDF code until UDF is successfully finished and he will still be able to reuse partial results - this is to solve most common problem where UDF fails with some user error and user figures out that there is small bug in his code so he want's to rerun but still reuse partial UDF results from previous run.

The idea is to create 2 checkpoints for UDF:

  1. Checkpoint before starting UDF calculation - hash of this checkpoint will include the hash of the chain except UDF itself (UDF function code) - i.e. it will include only UDF inputs into hash calc. This checkpoint is special and will be active until UDF is finally successfully done and we will connect partial UDF tables to it. We can also call this a partial checkpoint as it holds partial results. This checkpoint won't be needed when UDF finishes and can be removed, as it will be replaced with second checkpoint.
  2. Checkpoint after UDF is done - this is a checkpoint that will have chain part before UDF (udf inputs) + UDF itself (function code) in hash calculation and will be connected to final UDF results tables (we will create special "checkpoint" dataset out of it later on). If something fails in the chain after this UDF the next run will start from here, not recalculating UDF. Note that unlike "reset" checkpoint which is not invalidated if user changes UDF code, this one will be invalidated as it holds UDF function itself in hash calculation.

In example, these are the checkpoints created :

chain = (
    dc
    .read_values(num=list(range(100)))
    .filter(C("num") < 50)
     <----- Checkpoint 1, hash = hash(read_values) + hash (filter), data = UDF results (partial)  ------>
    .map(even=process_even) 
     <----- Checkpoint 2, hash = hash(Checkpoint 1) + hash (map/process_even), data = UDF results (final)  ------>
    .filter(C("num") < 20)
     <----- Checkpoint 3, hash = hash(Checkpoint2) + hash (filter), data = UDF results (partial)  ------>
    .gen(res=gen_double, output = {"double": int, "even": int}) 
     <----- Checkpoint 4, hash = hash(Checkpoint 2) + hash (gen/gen_double), data = UDF results (final)  ------>
    .save("numbers")
     <----- Checkpoint 5, hash = hash(whole chain), data = Created dataset (this is final checkpoint in this chain) ------>
)

Alternative for 2 types of checkpoints is to create just a single checkpoint and completely remove UDF code out of hash calculation - this means that we will never recognize when user change UDF function code and chain / udf checkpoints will always be reused and to make sure changes are reflected user would need to re-run whole job from scratch.

ilongin avatar Oct 09 '25 14:10 ilongin

A single checkpoint approach seems enough. Changing code and continuing from the previous checkpoint is the default behavior (until full rerun).

Questions:

  1. Now we are creating temp table before udf now. How's different from checkpoints? Are we reusing these or creating a new abstraction?

  2. Similar question about the output. A temp table is always created. Are we changing it or keeping two abstractions?

  3. Which of the checkpoints are visible for users? (It can be based on the code example)

  4. Can user use a non complete checkpoint from another script. How it suppose to be addressed?

dmpetrov avatar Oct 10 '25 21:10 dmpetrov

A single checkpoint approach seems enough. Changing code and continuing from the previous checkpoint is the default behavior (until full rerun).

Should UDF function be included in that checkpoint hash or not?

Questions:

  1. Now we are creating temp table before udf now. How's different from checkpoints? Are we reusing these or creating a new abstraction?

Yes, we have udf input table and udf output table (in mapper they are merged at the end, in generator just output table is used). We will use those and checkpoint is just a "marker" to know if we can use it or not. We would need to change cleaning logic to not remove those tables on job failure.

  1. Similar question about the output. A temp table is always created. Are we changing it or keeping two abstractions?

Same as above, we will use that temp table. We would need to add additional table for generator though - just a list of rows (only sys__id col is needed) which are processed as in generator there is no connection between output and input (e.g the same sys__id as in mapper) so we need a way to know what rows to skip on next run.

  1. Which of the checkpoints are visible for users? (It can be based on the code example)

What do you mean by visible? We will describe users that we will create checkpoints when dataset is saved and for UDFs.

  1. Can user use a non complete checkpoint from another script. How it suppose to be addressed?

I'm not sure I understand this as well. Checkpoints are used when we re-run some job (we create "child" job). User can change query code though. There is no plan to re-use it between different jobs atm, if that's what you were referring to.

ilongin avatar Oct 20 '25 12:10 ilongin