airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Resolve race condition re creation of APDR records

Open dstandish opened this issue 3 weeks ago • 10 comments

Body

When two tasks run at the same time and they contribute to the creation of the same dag run and partition key, then they can create different apdr records. This has to be fixed somehow. Maybe different design. More notes on options in comments below.

How to repro

Check out branch poc-asset-partitions. .

Make 2 identical dags with schedule e.g. */5 * * * *, each updating a different asset. Make another dag that listens to those assets using partitioned asset timetable.

Enable all the dags.

The two producer dags will run at basically the same time. Two APDR records will get created instead of one, and as a result, the downstream dag won't schedule.

More explanation

right now, as soon as one asset fires which maps to target key X, we create APDR record for target key X once that record is created, scheduler will consider APDR record for target key X in each loop when the second asset fires an event which maps to target key X, it should see that there is an existing APDR record and it should therefore just create the PAKL record which maps to this APDR record but if they fire at exactly the same time, currently due to this bug, they will both see that there is no APDR record, and they will therefore both create an APDR record, and therefore even though a run should be created, it isn't

Committer

  • [x] I acknowledge that I am a maintainer/committer of the Apache Airflow project.

dstandish avatar Dec 02 '25 04:12 dstandish

possible solutions

  1. use locking when running get-or-create-apdr. Somehow we would need to use locking so that the two writers would not both see no apdr record and thus both create one at the same time. Most extreme (probably not good) would be to take a lock on the dag table. But this could interfere with performance and other thnigs that try to lock the dag table. There are other ideas. E.g. create an apdr mutex table (or perhaps a partition state table...?) that would have the grain of dag_id and partition_key (i.e. a uniqueness constraint on that) and then this could be locked during the get-or-create operation, ensuring sequentiality. maybe the "apdr mutex table" is just a parent table with grain dag_id, partition_key and apdr keys to this.

  2. we could get rid of the apdr table and change the querying and logic to just sift through the pile of records in the key log table to figure out what runs need to be created etc. The querying would be more expensive but you would not be subject to the race condition.

  3. still use locking somehow but avoid an external mutex table by somehow using locking within apdr. e.g. perhaps implement some is_latest flag with a partial unique constraint, or something different

dstandish avatar Dec 02 '25 14:12 dstandish

use locking when running get-or-create-apdr. Somehow we would need to use locking so that the two writers would not both see no apdr record and thus both create one at the same time.

This seems to make the most sense to me. but will continue look into the possibility of creating new table to handle it

Lee-W avatar Dec 04 '25 11:12 Lee-W

~~Another idea is tracking source_dag_id? WDYT?~~

Lee-W avatar Dec 04 '25 13:12 Lee-W

I think the best is to lock the referred AssetModel row for update. This has far less of a performance impact - it only prevents several entities to modify this particular asset.

potiuk avatar Dec 07 '25 15:12 potiuk

Yep, will also give it a try!

Lee-W avatar Dec 08 '25 01:12 Lee-W

After a second thought, probably locking the APDR itself would be enough in this case?

a draft PR is created https://github.com/apache/airflow/pull/59183

Lee-W avatar Dec 08 '25 09:12 Lee-W

As commented in #59183 I do no think it will work. The locks are only created when row is selected, so when there is no row found, no lock is created.

With the database locks there are two approaches:

  1. You run SELECT FOR UPDATE (with_row_lock) on a row(s) that exist befor the lock attempt.
  2. if you do not have a row you can create a dedicated database lock (advisory lock) - we use it for database migration for example - but this one will lock all kinds of similar transactions and would not be performant.
  3. You could also lock the whole table, but that's even worse

Like @dstandish mentioned - locking Dag table is too "heavy". There is no other direct table with foreign key that we could lock (target_dag_id is the only one) - however in fact ADPR is linked to Asset via PartitionedAssetKeyLog and we know the asset that we are operating on (asset_id), so we could lock asset row instead. This is fine grained - you only block this particular asset from being updated in parallel. Which I think is the best approach.

potiuk avatar Dec 08 '25 23:12 potiuk

Got it. Just made the original PR draft. Let me give it another try

Lee-W avatar Dec 09 '25 01:12 Lee-W

@potiuk another option is to make another dedicated lock table like an "apdr_mutex" table which would control only the creation of new APDR records. this would have the same effect as locking asset, but it would not interfere with other things updating that asset.

but, maybe we just leave this in our back pocket and go there only if we see issues with locking the asset.

dstandish avatar Dec 10 '25 17:12 dstandish

@potiuk another option is to make another dedicated lock table like an "apdr_mutex" table which would control only the creation of new APDR records. this would have the same effect as locking asset, but it would not interfere with other things updating that asset.

but, maybe we just leave this in our back pocket and go there only if we see issues with locking the asset.

This is what @Lee-W has done in his PR for sqlite where row lock is not supported https://github.com/apache/airflow/pull/59183/changes#r2608538775 . and locking is based on inserting row with primary key to the log (temporarily - without committing it - and deleting it before transaction is committed). The problem with that is that it needs an active loop + sleep() rather than SELECT FOR UPDATE that will just wait for the lock to be relased. So I am afraid it might be less efficient than pur FOR UPDATE lock on existing row.

I also have think we do not need it in sqlite at all - in WAL mode of sqlite there is only wone writer at th same time allowed, but that's a different story.

potiuk avatar Dec 10 '25 23:12 potiuk