airflow
airflow copied to clipboard
Add dataset event timestamp to dataset dag run queue
This allows us to derive the dag run logical date from the event timestamp rather than the queue record created_at, which allows us to be slightly more precise when associating dag runs with dataset events.
Also, in the first commit, I change the signature a little bit, which reduces (a tiny bit) some duplication and makes it (again a tiny bit) more explicit what exactly these methods require, and therefore clearer what they actually do.
Overall looks okay. I do wonder if instead we should pass down the event itself instead. I could see custom manager classes wanting the dataset itself, so passing the event makes the api pretty clean?
Overall looks okay. I do wonder if instead we should pass down the event itself instead. I could see custom manager classes wanting the dataset itself, so passing the event makes the api pretty clean?
The thing about that, though, is that these methods are all private anyway, not part of the public API... So users should not implement them anyway right?
That's fair. I overlooked at they override register_dataset_change, not the others.
I wonder if the future annotations changes are going to cause heartache for #26290? Might be worth removing those here?
I wonder if the future annotations changes are going to cause heartache for #26290? Might be worth removing those here?
Oh yeah I thought everything had been merged.... I can undo that stuff tomorrow
It's merged now, so just resolve the conflicts and we should be good to go.
Just to color in my thoughts on the private methods in the dataset manager - I made them private only because I was trying to keep the dataset manager public interface as small as possible, and because I'm very cautious when it comes to API promises. I wouldn't fight if others want to make them public methods, and if we don't have any negative feedback about them by 2.5, I'd be fine with publicizing them (and probably making the private methods call their public equivalents), whatever is reasonable.
My personal preference would be to pass around the DatasetModel and DatasetEvent objects directly, since that just seems cleaner to me. If/when we publicize more/all of the dataset manager methods that might also make it easier on plugin authors to implement those methods.
At first glance it seems like it would be simpler to just add a foreign key to DDRQ that points to the dataset event row that caused it, but since we only really care about the timestamp on the DDRQ in the scheduler, we just store and use the dataset event timestamp (copied through to the DDRQ). Is that correct?
My personal preference would be to pass around the DatasetModel and DatasetEvent objects directly
ultimately, they are still private, so, if someone wants to change it back, well, there's no reason they couldn't. it's sort of a cosmetic decision.
but really, the main reason this came up was.... and i forgot to mention this earlier... was because i was performanrce testing the insert logic, and, (1) doing it this way meant not having to call the sqlalchemy relationship attr repeatedly, and (2) it also meant that i didn't have to set up the upstream-downstream relationships on the dataset ... i could just provide whatever IDs i wanted. so it made it marginally easier to test when structured this way, and that was the the main reason i went ahead and changed it. ultimately it's not of great consequence. it's a difference of one argument in the signature.
At first glance it seems like it would be simpler to just add a foreign key to DDRQ that points to the dataset event row that caused it, but since we only really care about the timestamp on the DDRQ in the scheduler, we just store and use the dataset event timestamp (copied through to the DDRQ). Is that correct?
right so... yeah ... keying DDRQ to the event that caused it... it does kindof make sense. but the thing is .... it could ultimately be multiple events that end up getting mapped to the same dag run. so ultimately what we do is.... so if dag 1 depends on datasets A and B, then if A is updated 5 times then finally B is updated once.... all of those A updates and the one B update are associated with the dag run, and the way that we figure that out is ... all the A events came before the B one... you know... it's an ephemeral table so there isn't a need really for a persistent ongoing association between the two (event and queue) and... i guess including just the value (not the key) saves a join in a query that needs to be fast.
ideally... i'd prefer it if we could avoid timestamp comparisons completely and have the event-dag-run association be determined even more precisely but when i was thinking about how to do it, it seemed a bit fraught due to race conditions but if you have ideas i'm happy to explore.
Ease of testing is a perfectly valid and good reason. You've convinced me. 😄
And yeah, I was thinking the dataset event would be the last of all of the dependencies that was updated but that would be a bit of an incomplete picture.
I'm afraid I don't have a solution that implements a race-free, complete, and accurate picture of the dependencies/events/dag run relationships.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.