incubator-devlake
incubator-devlake copied to clipboard
[Feature][Framework][POC] Streamed data collection using Singer Spec
Search before asking
- [X] I had searched in the issues and found no similar feature requirement.
Description
Do a POC on adding support for streamed, incremental data collections using the Singer spec. The existing incremental update support does not protect against a half-way failure or error in the collection process, forcing the user to have to start all over again. Specifically, this POC will focus on creating a new plugin based on an existing one that supports the Singer spec.
Use case
The goal would be to add resilience to the collection phase of devlake plugins, and to do so using a standard. Currently if there's an interruption in the middle of collection we don't have a means of recovery/resuming from that interrupted point. This becomes a problem when pulling data from large data sources. We can alleviate this problem by introducing incremental states as the collection takes place. If, say, we have to make 10 API calls to perform 10 DB writes for a given collector, and a failure happens on the 5th invocation, our next attempt should try to pick the collection back up from that step.
Related issues
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
cc @hezyin
Using the tap discovery tap-<something> -c config.json -d > catalog.json
gives us an output containing all streams (supported API calls), their response schemas and metadata fields. During testing I have found out that relying solely on these schemas is not sufficient. For example, the "github issues" schema does not have the "milestone" field on it. As a result, the tap's stream will never have that field in its response. But if I manually edit that schema to have that field in it (and adjust the metadata accordingly) we end up getting this field on the response as well.
The point is, we can't rely blindly on these taps as they might be incomplete, buggy, etc. They'll need to be rigorously tested and the underlying API endpoints will have to be studied. That kind of defeats their purpose of making our lives simpler, unless I simply encountered a one-off scenario.
Summary of impressions:
Pros:
- API calls are completely abstracted away, with no code needed from our side. We just have to study the streams defined in the catalog.json of the tap to see what's available to us, and that's straightforward.
- Responses are captured in JSON schemas (in catalog.json) - so, using code-generation, we can have compile-time type safety, and eliminate the boilerplate of manually writing models for them
- Built-in support for states to be able to resume Streams (where supported). Fully abstracted away from the developer.
Cons:
- Gives us less control over the API calls. Things like managing parallelism and rate-limiting are not within our control. In general, we don't have access to the query-params/headers that get sent. The "config.json" is the most we can control.
- Quality of the Tap implementations can be questionable. See my comment above. Some are not well-maintained, or necessarily in sync with the most current API contracts.
- We still need the plugin REST API logic, including the TestConnection, to be manually handled. Taps don't expose any way of testing the validity of our config until we actually proceed to data collection. So this part still has to be done like we always have.
- (minor) - requires some setup, such as installing the tap and creating a config.json (containing auth + endpoint usually) to be able to discover all the streams (APIs).
Misc:
- E2E testing will be different if we bypass the Collector layer. Now we won't have database records to convert to .csv format, so we're going to have to rethink how the DataflowTester should work. However, Taps are completely abstracted away in an interface, and the Records use generics based on the auto-generated code mentioned above, so mocking any data in code will be very easy.
My conclusion: I think singer-plugins can be useful as long as the plugin doesn't have a need for the Collector layer. Plugins such as Jira do need the raw layer for things like custom fields and modifying transformation rules without re-collection (see this). My current adaptation of the Singer-framework bypasses the Collector stage, and goes straight to Extract. If Collectors are absolutely necessary for all plugins, I don't think we're gaining anything with Singer - in fact, we lose, since we no longer have granular access to the API calls. If there can be more trivial plugins that can get away with no Collectors, and they come with well-maintained Singer-taps, I think the framework is useful for them.
Here's the diff that shows what sample (very limited) github and jira plugin implementations using this framework would look like.
@klesh @Startrekzky @hezyin please share your thoughts.
Next steps for me, in order to get some more concrete data points:
- Study the streams provided by Github and PagerDuty (issue #3498) and determine if they are comprehensive enough for us.
- Study the rate limiting logic of Github and Pagerduty taps to see if they honor any sort of API rate limits.
@keon94 Thanks for the summary, Keon.
Regarding the current singer-spec framework implementation: it's worth considering bringing back the raw data layer for two reasons: easier e2e testing and improved dev efficiency when modifying extraction/conversion logic. It's also not too much work as singer tap naturally spits out JSON messages and we just need to dump those JSON into our raw layer. Airbyte's taking a similar approach I believe.
The Github tap does provide rate-limiting support upon closer look. The logic is here called from here. There's an undocumented config called "max_sleep_seconds" which defines the upper bound of "X-RateLimit-Reset" (wait-time until next API call). We can add that to the config JSON to override its default of 600s. Simply put, if we exceed the rate limit, the tap sleeps for up to this many seconds before issuing the next API call. It'll fail if it has to sleep for longer. cc @hezyin
The PagerDuty tap also supports rate-limiting, but it's not as sophisticated. The logic is here. It only sleeps for a constant 60s internval once in the case of hitting a rate-limit. There will be no additional retries. Not configurable either. cc @hezyin
@keon94 @hezyin I hope you don't mind me popping in here but I'm a data engineer on the Meltano team and I noticed some of your activity in the Singer community recently and wanted to chime in 😄 .
Have you all considered using meltano for managing your Singer connectors? I saw in this comment that youre running Singer directly and meltano was created to help simplify some of those steps. It helps manage settings, config files, discovery, catalog creation and management, state management, etc.
Also if you haven't seen MeltanoHub for finding taps/targets you should check it out! The Meltano/Singer community does its best to select the top variants of each connector as default and make them available for installation from the Meltano cli. In your case the defaults listed right now are meltanolabs for tap-github and goodeggs for tap-pagerduty. Also since the pagerduty tap options are a bit dated you could optionally consider using the meltano SDK to rewrite it so its fresh and easier to maintain moving forward.