metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

Apply `Throttle` to all calls of `describe_jobs`

Open jaklinger opened this issue 4 years ago • 18 comments

[Note: currently in draft whilst a) I draft the code and b) check whether this resolves our situation - but would anyway appreciate feedback on whether the gist of this feature is desirable or not]

As originally described in (TooManyRequestsException) when calling the DescribeJobs operation, AWS has low concurrency limits on DescribeJobs. A fix was implemented in this PR to throttle requests from RunningJob, however I have frequent cases where the exception is raised from BatchClient. On top of this, my polling needs aren't particularly urgent and so reducing polling time via config would be ideal.

Therefore my suggested changes in this PR are:

  • Factor out two separate describe_jobs calls - i.e. in RunningJob (currently throttled) and in BatchClient (not throttled) - into a single function which can be throttled
  • Apply throttling to the re/factored describe_jobs so that throttling is applied in all cases
  • As suggested in https://github.com/Netflix/metaflow/pull/160#issuecomment-794518543, make the throttle parameters configurable
  • Prior to throttling, a minimum time between consecutive polls is applied, equal to delta_in_seconds. Do you think this is too harsh?
  • As a result of the change, throttling and polling are required to add explicit waiting time, rather than looping until success.

jaklinger avatar Aug 03 '21 08:08 jaklinger

@jaklinger This method in BatchClient is used only in the list_jobs and kill_jobs method which are accessed by python flow.py batch list and pythin flow.py batch kill. Are you running into throttling issues with those CLI calls?

savingoyal avatar Aug 04 '21 16:08 savingoyal

Hi @savingoyal thanks for taking a look. On reviewing my job failure I see that I've misread my error message: it actually refers to DescribeJobDefinitions and so I guess this here

An error occurred (TooManyRequestsException) when calling the DescribeJobDefinitions
operation (reached max retries: 4): Too Many Requests

There are only 96 batch jobs in the flow in question, but our production system is running several flows in parallel, most with multiple steps with ~100 batch jobs. Since each batch job will have one call to _register_job_definition it looks like I'm hitting a slightly different rate limit to the one fixed in https://github.com/Netflix/metaflow/pull/160

Would adding Throttle to BatchJob._register_job_definition make sense?

Regarding making the throttle parameters configurable, would this also make sense?

(the implication is that I would implement these instead in this PR if so. For our production system, I will check that this remedies the situation which now occurs on a regular basis, but if this isn't a feature that is desirable for you then we are happy to run from a fork)

jaklinger avatar Aug 04 '21 16:08 jaklinger

@jaklinger Thanks for the update. I am surprised that we are running into issues with _register_job_definition call - can you confirm that the issue goes away with the proposed PR? Re: tweaking the parameters for the throttle decorator - yes, we can make it configurable, but given that AWS Batch is a managed service, I would expect almost all the users to rely on the default values that we would configure.

Also, if it's quicker with the back and forth, the dev team is available on the #dev-metaflow channel in slack.outerbounds.co.

savingoyal avatar Aug 09 '21 16:08 savingoyal

Another simpler way to resolve this issue would be to ensure that we register the job definition only per step. Right now we check if the job definition exists or not for every task and a step could map to multiple tasks (in case of a for-each).

savingoyal avatar Aug 09 '21 16:08 savingoyal

can you confirm that the issue goes away with the proposed PR?

Sorry for the delay - I can confirm that this PR has stabilised our own system and I've done some testing to try to ramp up the number of concurrent batch tasks from distinct & concurrent flows using a larger value of delta via the proposed configuration/env variable - and I have recorded no failures since implementing the change.

Re: tweaking the parameters for the throttle decorator - yes, we can make it configurable, but given that AWS Batch is a managed service, I would expect almost all the users to rely on the default values that we would configure.

Indeed - this is obviously totally your call to make. If it is at all preferable for this PR, I can split out the new configurable parameters into our own fork which we can keep in sync with master. I'll raise this as an issue for discussion on the slack workspace - but feel free to comment directly here!

Another simpler way to resolve this issue would be to ensure that we register the job definition only per step. Right now we check if the job definition exists or not for every task and a step could map to multiple tasks (in case of a for-each).

Ah I see - of course. On top of that another way to resolve the issue would be to decorate the batch client itself with a throttler, which may guard against large numbers of edge-case AWS limits like this arising from other parts of the boto3 API. I'll raise this as a separate thread on slack.

In the meanwhile I'll mark this as "ready for review" since I can confirm that it resolves my initial problem - but happy to receive feedback in the threads which I'll post on slack.

jaklinger avatar Aug 11 '21 14:08 jaklinger

Follow up conversation - https://outerbounds-community.slack.com/archives/C020U025QJK/p1628764122047900 at slack.outerbounds.co

savingoyal avatar Aug 12 '21 14:08 savingoyal

Left some comments, sorry it took so long to get to it. But we definitely need a fix, even our internal tests fail due to this once in a while.

Also you may want to take a look at this implementation for K8S and maybe borrow some ideas https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/aws/eks/kubernetes_client.py#L22-L55 . When it comes to various exponential retry decorator implementations, from operations perspective, I like when it is super clear how long will this thing take to fail completely, since it comes handy for debugging visibly "stuck" processes/requests/etc.

(basically the same argument google folks use here )

oavdeev avatar Dec 08 '21 20:12 oavdeev

Left some comments, sorry it took so long to get to it.

Not at all, thank you @oavdeev for taking the time to review! I've refactored, and merged to the latest master. It should be easier to review than before as well since I see that you've implemented linting since my last merge.

Also you may want to take a look at this implementation for K8S

Thanks for the tip - I've implemented a new suggestion based on this - rather than duplicate the code I've refactored your K8s retry deco into a retry decorator factory so that it can be applied to both K8s and batch. You basically need to provide a couple of handle methods to instantiate a concrete my_retry decorator, and then the underlying exponential backoff is dealt with in the factory.

Regarding both

nit: i'd leave it to the caller to cast. For env vars, we usually do the casting in metaflow_config.py

and

When it comes to various exponential retry decorator implementations, from operations perspective, I like when it is super clear how long will this thing take to fail completely, since it comes handy for debugging visibly "stuck" processes/requests/etc.

No problem, I've moved any casting to metaflow_config.py, and now using google-style retry deco. On top of that I've moved the override of defaults into the batch_retry decorator so that it is clear for the developer what the default is, rather than having to dig into the config (I think personally that I would rather have this in metaflow_config as well, but I'll leave as is for now and see what you think)

can it be something like @throttle without () ?

All the decos in the K8s and batch client modules were using parentheses, but I've implemented a method that allows for either to be used for full compatibility 🙂. I think it's a toss-up between having a more verbose (possibly less transparent) implementation of the decorator and being able to drop parentheses in this case - but I'm happy either way!

other notes

  • I've also added in a test test_aws_utils - I'm not sure if this module has been placed appropriately, or whether or not it is desirable.
  • I'll deploy this now on our system to check that it works with batch. I won't get a result back for a little while (at least a few days) since I need to fix some other things on my end.
  • I'm not using the K8s plugins and so it is a little trickier to test on my end

Regarding the last two points, hopefully your integration tests will pick these up!

jaklinger avatar Dec 13 '21 19:12 jaklinger

@oavdeev all checks out on our end with batch! FYI this is my last week in my current job and so I won't have access to our AWS & pipeline setup anymore. If there are other changes that need prototyping I won't be able to test them myself in production, but I can pass them on to my colleagues if needs be!

jaklinger avatar Dec 22 '21 10:12 jaklinger

Hello! 👀 I just hit this exact issue. Is there a simple thing I can implement on my end to get around this?

IsaKiko avatar Mar 21 '22 09:03 IsaKiko

@IsaKiko - non-project dev here - if you are able to take a fork of this branch and rebase on master then simply using this branch may solve your problem, and if necessary throttle parameters are configurable as of this branch so you can set more sympathetic parameters that could give your processes a bit more grace. Other than that I defer to the project devs!

jaklinger avatar Mar 21 '22 09:03 jaklinger

@jaklinger I'm not sure that's really an option. Is it possible that reducing the number of jobs we fan out to would make a difference?

IsaKiko avatar Mar 21 '22 11:03 IsaKiko

If that's a viable option for you then yes that should circumvent the problem, but for our use case that would have meant vastly increasing total run time

jaklinger avatar Mar 21 '22 11:03 jaklinger

Thank you! Did this last night and woke up to a successful run this morning. :) ...for now I'll try get the run time down in another way.

IsaKiko avatar Mar 21 '22 22:03 IsaKiko

@savingoyal @oavdeev Is there any reason this still hasn't been merged? We are definitely seeing this issue, even with running one flow with 1000+ splits, at times and as we scale up, this is going to become a much bigger issue

DanCorvesor avatar Apr 07 '22 11:04 DanCorvesor

@DanCorvesor The correct fix for this issue is to ensure that the calls to describe_job_definitions are a function of the number of steps in a flow vs the number of tasks in a flow (the current behavior). Given that there are global rate limits to this call, even with this PR merged in as is - the probability of resolving this issue doesn't go down substantially. I can work on a proper fix after we have shipped the Kubernetes release (so hope to see progress on this in a few weeks).

savingoyal avatar Apr 13 '22 15:04 savingoyal

@savingoyal I've recently encountered the same issue with metaflow v2.10.6 when running 1 flow with 100 splits. Is there some recommended way for dealing with this problem except reducing --max-workers?

shchur avatar Nov 09 '23 14:11 shchur

Might be interesting for others encountering the same problem:

A simple workaround is to add an extra step that introduces jitter before the step that is executed in Batch. For example, the following code

    @step
    def start(self):
        self.task_defs = [...]
        self.next(self.execute_task, foreach="task_defs")
    
    @step
    @batch
    def execute_task(self):
        execute_fn(self.input)

can be replaced with

    @step
    def start(self):
        self.task_defs = [...]
        self.next(self.add_jitter, foreach="task_defs")
    
    @step
    def add_jitter(self):
        import random
        import time

        time.sleep(random.uniform(0, 60))
        self.task_def = self.input
        self.next(self.execute_task)
    
    @step
    @batch
    def execute_task(self):
        execute_fn(self.task_def)

This is not elegant, but allows to avoid hitting the rate limit without modifying the metaflow code.

shchur avatar Nov 24 '23 14:11 shchur