sdk icon indicating copy to clipboard operation
sdk copied to clipboard

feat: Support for async and potentially parallel REST calls

Open MeltyBot opened this issue 4 years ago • 7 comments

Migrated from GitLab: https://gitlab.com/meltano/sdk/-/issues/184

Originally created by @aaronsteers on 2021-08-09 18:55:48


1. Async at the Stream level

For APIs such as Klaviyo with a very low number of records per call (100 records per call in the case of Klaviyo), the throughput of the stream is limited by the need to make serial and synchronous roundtrips to the remote server. One user in slack (missing thread link) proposed using an asyncio approach to retrieving records, and this could significantly speed up the throughput.

For taps where next_page_token is deterministic:

If the next page token is predictable, such as an integer page number, multiple calls could be made simultaneously - for instance requesting pages 1 through 5 immediately and asynchronously, and then processing each returned result as it is received.

For taps where next_page_token is not deterministic:

If the next page token is not predictable (such as an alphanumeric token), the REST calls still need to be made serially, but we can at least send the next call in asyncio as soon as the first result set is received, rather than waiting until the records are fully processed. In this way, the tap can be waiting for the response from the remote service while also simultaneously processing the records from the prior request.

Best reason not to build

Introducing an async implementation could make it harder to manage or debug new taps. We could mitigate this by trying to keep the development experience as simple as possible, and if the developer experience would need to change, we could make parts of this an opt-in experience for taps which require it. (Ideally the async methods would all be internal, and the dev experience would not need to change at all.)

Other considerations

Ideally we would still process each call in original pagination order, even if results were returned from the API with alternative sequence. This would keep "sorted stream" behavior as-is today. If not, meaning if results could be emitted out of order, then we want to make sure we have is_sorted=False, which triggers Signpost behavior and other treatments for unsorted streams.

2. Async at the tap level

The above (stream-level async) assumes we want to run REST calls in parallel. We could also/alternatively run multiple stream sync operations in parallel, for instance, invoking up to a max number of stream sync operations side-by-side with each other.

This approach does not solve for a single stream being slow due to small batch sized.

This approach also runs a higher risk of overloading a target and/or overloading the memory buffers and/or the maximum PIPE limits between STDOUT and STDIN, but this should be manageable.

MeltyBot avatar Aug 09 '21 18:08 MeltyBot

@aaronsteers @edgarrmondragon want to pull this one from the depths of the backlog. This came up in a customer interview as a useful feature. Particularly around the BATCH use case - when a service is generating the batch themselves, say the bulk SFDC API, we'd want to be able to call and endpoint and wait for a URL or key to fetch the file before sending it to the target.

tayloramurphy avatar Aug 30 '22 15:08 tayloramurphy

This has come up again in Slack: https://meltano.slack.com/archives/C01PKLU5D1R/p1667839697099139.

From the issue main description, I think we can do 1 long before we need or want 2.

Also, one requirement for the solution from me would be to hide any asyncio implementation details from the developer as much as possible. Similar to Textual's approach:

Textual knows to await your event handlers if they are coroutines (i.e. prefixed with the async keyword). Regular functions are generally fine unless you plan on integrating other async libraries (such as httpx for reading data from the internet).

edgarrmondragon avatar Nov 07 '22 20:11 edgarrmondragon

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

stale[bot] avatar Jul 18 '23 05:07 stale[bot]

Still relevant

edgarrmondragon avatar Jul 20 '23 16:07 edgarrmondragon

Took a quick peek at this today as we have an extremely long running job (>3 days) that we'd like to speed up. Setup is Parent Stream (~1k) -> Child Stream (~10k entities) -> Final Child Stream (1 call per combo) . So lots of API calls, our api in this case allows for ~3 req/sec that we don't think we're even close to as the final API calls are very slow.

In combination with https://github.com/meltano/sdk/issues/317 this might be easier, but https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/streams/core.py#L1197-L1200 and https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/tap_base.py#L358-L368 kind of explain the issue, we have one instance of a stream class but we need to call sync 10 million plus times, we'd need the ability to create unique child streams for each api calls (or at least N number of them where N is the number that we want to run at the same time in parallel) . Challenges with this approach is that since we have state management coupled in with each stream you can't nicely run multiple unique stream instances at the same time.

Couldn't think of any easy hacks as the tap is really tied to state for the individual stream runs (in both Auth and the stream run if that makes sense). Just wanted to share what I looked into quickly!

visch avatar Aug 08 '23 14:08 visch

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

stale[bot] avatar Aug 07 '24 15:08 stale[bot]