luigi
luigi copied to clipboard
tasks left pending because dependency run by other worker
Hi!
I'm using luigi in production.
The luigi version used on the worker side is 2.8.13
.
The luigi version used for the central scheduler is 3.0.2
.
I have some top level tasks sharing dependencies, something like:
RefreshClient1Data
|-> PullAdobeData(account="adobeaccountid")
|-> PullAdwordsData(account="adwordsaccountid")
|-> PullFileData(dir="client1")
RefreshClient2Data
|-> PullAdobeData(account="adobeaccountid")
|-> PullAdwordsData(account="adwordsaccountid")
|-> PullFileData(dir="client2")
But with dozens of dependencies generated.
Now, I run those tasks through cronjobs using luigi --workers=8 --module=... RefreshClient1Data
, luigi --workers=8 --module=... RefreshClient2Data
... and I have a central scheduler deployed configured as default scheduler in luigi.cfg
.
I expect the shared dependencies to be merged and deduplicated, so that each dependency runs only once, and workers coordinate to wait for shared dependencies to complete before running their next task.
However, I notice that sometimes the luigi
call just stops, "successfully", short of running all the scheduled tasks, because some shared dependency is pending in some other worker(presumably started from another luigi call):
Scheduled 21 tasks of which:
* 19 ran successfully:
- 1 MicroadobePopulateReportingData(unique_id=, account_id=[...], suite_id=[...], report_id=sem-dashboard, interval=2021-02-12-2021-02-26)
- 1 RefreshAizanCalls(unique_id=, interval=None)
- 1 RefreshClient1TelusCalls(unique_id=, interval=2021-02-12-2021-02-26)
- 15 RefreshSearchDataPerAccount(unique_id=, account=[...], account_id=[...], engine=adwords, interval=2021-02-12-2021-02-26) ...
- 1 RefreshTelusCalls(unique_id=, interval=2021-02-12-2021-02-26, client=ifc)
* 2 were left pending, among these:
* 1 were being run by another worker:
- 1 MicroadobePopulateReportingData(unique_id=, account_id=[...], suite_id=[...], report_id=sem-dashboard, interval=2021-02-12-2021-02-26)
* 1 had dependencies that were being run by other worker:
- 1 RefreshClient1SEM(unique_id=, interval=2021-02-12-2021-02-26, date=2021-02-26)
The other workers were:
- Worker(salt=549191704, workers=1, host=aa0d502ef07e, username=[...], pid=1252) ran 1 tasks
This progress looks :) because there were no failed tasks or missing dependencies
Do I understand correctly that luigi is just dropping the task RefreshClient1SEM
because one of its dependency was run on another worker?
Why doesn't it just wait for that other dependency and complete RefreshClient1SEM
?
Why does it show up as successful if the explicit instruction to run the task isn't followed through?
How could I ensure my task runs if there's a possibility its dependencies will be run concurrently by another worker?
Thanks!
Luigi will per default not wait for pending requirements to complete. You can change this with the configuration setting worker.keep_alive.
It is good practice to ensure that a task is only scheduled from a single deployment, with a consistent DAG. It is normal to have redundant deployment of a DAG, but they should all be identical.
If the deployments are identical, your task RefreshClient1SEM will get run by the worker that runs MicroadobePopulateReportingData once it is finished.
If your deployments are overlapping but not identical, you can end up with confusing situations where tasks might be run by inconsistent code, which is difficult to debug.
--
Lars Albertsson Data engineering entrepreneur www.scling.com, www.mapflat.com +46 70 7687109 https://twitter.com/lalleal, https://www.linkedin.com/in/larsalbertsson/
On Fri, Feb 26, 2021 at 5:47 PM Charles [email protected] wrote:
Hi!
I'm using luigi in production. The luigi version used on the worker side is 2.8.13. The luigi version used for the central scheduler is 3.0.2.
I have some top level tasks sharing dependencies, something like:
RefreshClient1Data |-> PullAdobeData(account="adobeaccountid") |-> PullAdwordsData(account="adwordsaccountid") |-> PullFileData(dir="client1")
RefreshClient2Data |-> PullAdobeData(account="adobeaccountid") |-> PullAdwordsData(account="adwordsaccountid") |-> PullFileData(dir="client2")
But with dozens of dependencies generated.
Now, I run those tasks through cronjobs using luigi --workers=8 --module=... RefreshClient1Data, luigi --workers=8 --module=... RefreshClient2Data ... and I have a central scheduler deployed configured as default scheduler in luigi.cfg.
I expect the shared dependencies to be merged and deduplicated, so that each dependency runs only once, and workers coordinate to wait for shared dependencies to complete before running their next task. However, I notice that sometimes the luigi call just stops, "successfully", short of running all the scheduled tasks, because some shared dependency is pending in some other worker(presumably started from another luigi call):
Scheduled 21 tasks of which:
- 19 ran successfully:
- 1 MicroadobePopulateReportingData(unique_id=, account_id=[...], suite_id=[...], report_id=sem-dashboard, interval=2021-02-12-2021-02-26)
- 1 RefreshAizanCalls(unique_id=, interval=None)
- 1 RefreshClient1TelusCalls(unique_id=, interval=2021-02-12-2021-02-26)
- 15 RefreshSearchDataPerAccount(unique_id=, account=[...], account_id=[...], engine=adwords, interval=2021-02-12-2021-02-26) ...
- 1 RefreshTelusCalls(unique_id=, interval=2021-02-12-2021-02-26, client=ifc)
- 2 were left pending, among these:
- 1 were being run by another worker:
- 1 MicroadobePopulateReportingData(unique_id=, account_id=[...], suite_id=[...], report_id=sem-dashboard, interval=2021-02-12-2021-02-26)
- 1 had dependencies that were being run by other worker:
- 1 RefreshClient1SEM(unique_id=, interval=2021-02-12-2021-02-26, date=2021-02-26)
The other workers were: - Worker(salt=549191704, workers=1, host=aa0d502ef07e, username=[...], pid=1252) ran 1 tasks
This progress looks :) because there were no failed tasks or missing dependencies
Do I understand correctly that luigi is just dropping the task RefreshClient1SEM because one of its dependency was run on another worker? Why doesn't it just wait for that other dependency and complete RefreshClient1SEM? Why does it show up as successful if the explicit instruction to run the task isn't followed through? How could I ensure my task runs if there's a possibility its dependencies will be run concurrently by another worker?
Thanks!
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/spotify/luigi/issues/3049, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAXFFGXJHMVM33NAC57TE73TA7GC7ANCNFSM4YIYGEGQ .
Thanks, I'll use keep_alive!
Some clarifications on our setup.
- We have multiple deployments running luigi(different codebases), sharing the same central scheduler
- in a single deployment of a codebase, we have multiple cronjobs running luigi tasks(potentially concurrently depending on overlapping cron schedules).
I understand the point about separate DAG sharing the same tasks in name, but not in code, we should be careful of that.
In this case though, I have multiple tasks DAG in the same codebase sharing the same dependencies. If those (top-level) tasks get executed concurrently, in separate workers, I expected the scheduler would coordinate the workers to decide who gets to run the shared tasks and who gets to wait.
The scheduler behaves as you expect, if you set the keep_alive option.
The trap that I am trying to highlight is that you cannot be sure which job will run the upstream dependencies, e.g. PullAdwordsData. As long as both deployments have identical definitions of PullAdwordsData, things will work fine. But when you deploy a new version, or the definitions drift apart by accident, you will have a hard time debugging the problem. Your deployment that you expect runs the task will look fine, but occasionally there is an unexpected result, depending on which deployment raced first to the central scheduler.
Here is an old Spotify war story where we were bitten by "workflow DAG inconsistency", i.e. when different workers had different ideas of how a task should be run. It is not the same scenario as yours, but tangential. It took a long time to figure out what was going on. https://twitter.com/lalleal/status/1205443036697780224
Since then, I have learnt to ensure that a particular task is only run from one single deployment artifact, i.e. container image. It is ok if it is triggered by multiple cron invocations, as long as the deployment is synchronised. If your tasks RefreshClient1Data and RefreshClient2Data are both packaged in the same image and deployed at the same time, you should be fine.
If you have common upstream dependencies that are not deployed
synchronously, each upstream task should be run by only one of the jobs.
The other downstream jobs should require
it as an external dependency,
i.e. an ExternalTask
. luigi.task.externalize
is your friend. The
central scheduler will connect them and realise they are the same task.
There will be no delay introduced if you use keep_alive
and the
downstream job waits for upstream long enough.
I have also learnt to automate deployment to ensure that no old job definitions are lingering.
Apologies if this was unsolicited advice that sounds patronising. I have helped many companies build data platforms, and observed that mastering workflow orchestration is a key to success, but that there are many patterns and practices that are known to veterans, but that are not documented or shared anywhere. So I strive to share and explain when something happens to trigger me. It would be better to put it in the Luigi docs, but that requires more of my energy, so this is a start. :-)
--
Lars Albertsson Data engineering entrepreneur www.scling.com, www.mapflat.com +46 70 7687109 https://twitter.com/lalleal, https://www.linkedin.com/in/larsalbertsson/
On Tue, Mar 2, 2021 at 2:46 AM Charles [email protected] wrote:
Thanks, I'll use keep_alive!
Some clarifications on our setup.
- We have multiple deployments running luigi(different codebases), sharing the same central scheduler
- in a single deployment of a codebase, we have multiple cronjobs running luigi tasks(potentially concurrently depending on overlapping cron schedules).
I understand the point about separate DAG sharing the same tasks in name, but not in code, we should be careful of that.
In this case though, I have multiple tasks DAG in the same codebase sharing the same dependencies. If those (top-level) tasks get executed concurrently, in separate workers, I expected the scheduler would coordinate the workers to decide who gets to run the shared tasks and who gets to wait.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/spotify/luigi/issues/3049#issuecomment-788509969, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAXFFGXVNFDJJBPF2PDEO3LTBQ7ONANCNFSM4YIYGEGQ .
Thanks, that all makes sense.
In our case, we don't have multiple deployments of the same codebase sharing a scheduler.
On Sat., Mar. 6, 2021, 5:16 a.m. Lars Albertsson, [email protected] wrote:
The scheduler behaves as you expect, if you set the keep_alive option.
The trap that I am trying to highlight is that you cannot be sure which job will run the upstream dependencies, e.g. PullAdwordsData. As long as both deployments have identical definitions of PullAdwordsData, things will work fine. But when you deploy a new version, or the definitions drift apart by accident, you will have a hard time debugging the problem. Your deployment that you expect runs the task will look fine, but occasionally there is an unexpected result, depending on which deployment raced first to the central scheduler.
Here is an old Spotify war story where we were bitten by "workflow DAG inconsistency", i.e. when different workers had different ideas of how a task should be run. It is not the same scenario as yours, but tangential. It took a long time to figure out what was going on. https://twitter.com/lalleal/status/1205443036697780224
Since then, I have learnt to ensure that a particular task is only run from one single deployment artifact, i.e. container image. It is ok if it is triggered by multiple cron invocations, as long as the deployment is synchronised. If your tasks RefreshClient1Data and RefreshClient2Data are both packaged in the same image and deployed at the same time, you should be fine.
If you have common upstream dependencies that are not deployed synchronously, each upstream task should be run by only one of the jobs. The other downstream jobs should
require
it as an external dependency, i.e. anExternalTask
.luigi.task.externalize
is your friend. The central scheduler will connect them and realise they are the same task. There will be no delay introduced if you usekeep_alive
and the downstream job waits for upstream long enough.I have also learnt to automate deployment to ensure that no old job definitions are lingering.
Apologies if this was unsolicited advice that sounds patronising. I have helped many companies build data platforms, and observed that mastering workflow orchestration is a key to success, but that there are many patterns and practices that are known to veterans, but that are not documented or shared anywhere. So I strive to share and explain when something happens to trigger me. It would be better to put it in the Luigi docs, but that requires more of my energy, so this is a start. :-)
--
Lars Albertsson Data engineering entrepreneur www.scling.com, www.mapflat.com +46 70 7687109 https://twitter.com/lalleal, https://www.linkedin.com/in/larsalbertsson/
On Tue, Mar 2, 2021 at 2:46 AM Charles [email protected] wrote:
Thanks, I'll use keep_alive!
Some clarifications on our setup.
- We have multiple deployments running luigi(different codebases), sharing the same central scheduler
- in a single deployment of a codebase, we have multiple cronjobs running luigi tasks(potentially concurrently depending on overlapping cron schedules).
I understand the point about separate DAG sharing the same tasks in name, but not in code, we should be careful of that.
In this case though, I have multiple tasks DAG in the same codebase sharing the same dependencies. If those (top-level) tasks get executed concurrently, in separate workers, I expected the scheduler would coordinate the workers to decide who gets to run the shared tasks and who gets to wait.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/spotify/luigi/issues/3049#issuecomment-788509969, or unsubscribe < https://github.com/notifications/unsubscribe-auth/AAXFFGXVNFDJJBPF2PDEO3LTBQ7ONANCNFSM4YIYGEGQ
.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/spotify/luigi/issues/3049#issuecomment-791907414, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABOHKHSQWCT7TVALFGQJFEDTCH6G5ANCNFSM4YIYGEGQ .
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.