OpenMetadata
OpenMetadata copied to clipboard
Refactor OpenLineage Integration to handle additive OpenLineage events and add official OpenLineage API endpoint
This pull request refactors the OpenLineage integration. Co-authored with @mgorsk1
The main objectives are to enable support for additive OpenLineage events (by introducing state management), add an official endpoint for the OpenLineage client, and introduce a new type of service connection that allows consuming OpenLineage events posted to the official endpoint
Key Changes:
Additive OpenLineage Event Handling:
Redesigned the connector to handle additive OpenLineage events by introducing state management
New Endpoint:
Added an official OpenLineage-compatible endpoint, /api/v1/openlineage/lineage, to receive lineage events from the official OpenLineage client. This enables official OpenLineage agents (e.g., from Trino, Spark, Flink) to deliver lineage information directly to OpenMetadata
New OpenLineage service connection type:
Introduced a new type of OpenLineage connection that processes lineage information delivered by the /api/v1/openlineage/lineage endpoint.
Type of change:
- [ ] Bug fix
- [x] Improvement
- [x] New feature
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation
Checklist:
- [x] I have read the CONTRIBUTING document.
- [ ] My PR title is
Fixes <issue-number>: <short explanation> - [ ] I have commented on my code, particularly in hard-to-understand areas.
- [ ] For JSON Schema changes: I updated the migration scripts or explained why it is not needed.
Hi there 👋 Thanks for your contribution!
The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.
Let us know if you need any help!
Compatibility of public endpoint did not require any changes to OpenLineage standard/integrations and is assumed to work out of the box with OpenLineage 1.12+
Hi there 👋 Thanks for your contribution!
The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.
Let us know if you need any help!
Hi there 👋 Thanks for your contribution!
The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.
Let us know if you need any help!
@dechoma Thanks for the PR. did a cursory look on the PR. Few things
- Instead of adding OpenLineage end point, this could be modeled as a sub endpoint under /api/v1/lineage
- This end point can take the payload of OpenLineage and convert that into Lineage request of OpenMetadata
- OpenMetadata Lineage specification considers much larger picture and we do not want to create whole another API just for OpenLineage which will confuse the users and also clearly causes confusion between users of OpenMetadata
Can you please let us know why we need another API endpoint instead of using the OpenLineage payload to convert to OM Lineage request.
The Python checkstyle failed.
Please run make py_format and py_format_check in the root of your repository and commit the changes to this PR.
You can also use pre-commit to automate the Python code formatting.
You can install the pre-commit hooks with make install_test precommit_install.
Jest test Coverage
UI tests summary
| Lines | Statements | Branches | Functions |
|---|---|---|---|
| 65.18% (35156/53940) | 41.97% (13897/33109) | 43.95% (4317/9823) |
Quality Gate passed for 'open-metadata-ui'
Issues
0 New issues
0 Accepted issues
Measures
0 Security Hotspots
No data about Coverage
0.0% Duplication on New Code
The reason for specific endpoint is OpenLineage events are accumulative, meaning that single payload won't contain complete lineage information - hence we need to store intermediate states somewhere. Consider spark application with openlineage integration that emits 3 events for one ETL job:
- event 1 (type: START) runid=1, inputs=[table a, table b], outputs=[]
- event 2 (type: RUNNING) runid=1, inputs=[table c], outputs=[]
- event 3 (type: COMPLETE) runid=1, inputs=[], outputs=[table d]
so you have same run (runid=1) with 3 separate events emitted. The actual lineage information here is - tables a, b, c were used to create table d. This is a real scenario in OpenLineage, which by design doesn't have to maintain state across events (particularly tricky in spark). To transform this into meaningful lineage information in OpenMetadata we need to merge these 3 events (once we receive complete event). Otherwise it's not possible to connect column lineage information (table-level would be possible having pipeline object in the middle but I think it's an inferior approach).
Hi, @harshach could you let me know if @mgorsk1's explanation is sufficient? Due to fact that OpenLineage events are generated in additive way(described in comment above) we decided to introduce the state, and the main reason behind the new API was to manage that state in a organized way.
Hi, @harshach could you let me know if @mgorsk1's explanation is sufficient? Due to fact that OpenLineage events are generated in additive way(described in comment above) we decided to introduce the state, and the main reason behind the new API was to manage that state in a organized way.
hi @dechoma @mgorsk1, thanks for the explanation.
I am not sure if adding connector-specific endpoints to the API is the way to go. The goal of OM is to bring a generic API layer and schemas that we can then use to ingest any source.
Here we are talking that the issue is not the current API structure or payload definition, but rather how OpenLineage emits the events. This looks to me too specific about OpenLineage to solve it on the server rather than trying to keep the logic on the connector itself.
Maybe it's a too naive approach, but could we dump 24h of events in a file, iterate over ir and aggregate the lineage info for each runId, and then clean the file? Since we're talking about ETL lineage - processes that most likely will be repeated over time - the results should eventually cover all the right lineage, even if in the first run, the event dump is missing a few events for a few runIds.
I am not saying we need to follow this approach, just wondering if you considered any other alternative without having to rely on the server itself?
Thanks
Hey @pmbrull, thanks for replying. Indeed adding new API endpoints was not our first choice, but we have not found other way to interact with OpenMetadata storage (database or search engine) and this is where I think OpenLineage event state should be managed (this or Elasticsearch index). Everything that happens in ingestion framework and needs persistence now goes through backend API.
Naively dumping events to files near OM is not a sound solution, especially with a lot of deployments relying on stateless kubernetes objects, so it would pose a threat of inconsistency. If we can find a way to maintain state natively in OpenMetadata without having a need for new API endpoints then I am fine with it.
The benefit (to us) of using OpenLineage integrations is that it provides very mature and broad collection of out-of-the-box working integrations to very popular processing engines - such as spark, flink and trino. OpenMetadata is more focused on pulling lineage from database systems and audit logs (which in some cases is super beneficial and we also use this approach for BigQuery integration) while in certain architectures all you can/should rely on is lineage from processing engine (that would be Spark and Trino for us).
The other routes we considered were:
- use native OpenMetadata Spark Agent - but we found it not to be very mature and catered solution, it does not fully recognize the hardships of collecting lineage via Spark (and in it's current form is not usable for majority of ETLs we run on production). OpenLineage integration with Spark is far superior and more reliable.
- deploy Marquez (which persists events) and write connector between OM and Marquez, but it doesn't seem very efficient to deploy another service just for the sake of maintaining the state if OM could do it. We would not use any other functionality of Marquez. We thought it'd be better to offer same Marquez functionality in OM. This is also what DataHub team did - implement (partially) OL API.
- maintain a separate database instance (or database schema) and use our OpenLineage integration to persist the state there. We thought it'd be better to keep it in
omschema of ourpostgresqldatabase but this could also be possible - OpenLineage <> OpenMetadata connector having an option to define db connection where the state would be persisted, then we would interact with it via sqlalchemy.
We'll keep this PR on hold while we discuss possible options. Some notes from our conversation:
- Option A: handle lineage aggregation in the server, lineage reconciliation
- This would improve our Spark Lineage Agent as well
- Option B: naive approach -> aggregate state in the connector. That could have partial info, we cannot reprocess, we might miss events, what happens if it fails
- E.g., Spark streaming may never have "complete" event, just running
- that would work only for batch applications
- when is the good moment to send the lineage information?? (e.g., in Flink)
Hi thanks both for sharing the thoughts here. We have integrated these "half" events handling directly in the Lineage Agent, so there's no need to rely on any changes in the server to get this working.
Please @mgorsk1 feel free to take it for a spin. @ulixius9 please do add any pending details here or configs to get this set.
For now, I'll close this PR
Thank you
Can you point me towards the code you have in mind @pmbrull ? Thanks!
we are yet to publish the code for this jar but we have built on top of the openlineage code and made sure the half lineages gets captured...
you can test out this jar and let us know your feedback
openlineage-spark_2.12-1.22.0.jar.zip
it works in the same way as mentioned in this doc: https://docs.open-metadata.org/latest/connectors/ingestion/lineage/spark-lineage#4.-configure-spark
but just instead of using spark.openmetadata.transport prefix you use spark.openlineage.transport
yeah we are already using this connector and had a moment where we introduced state management within but community was not happy about it :) looking forward to OL PR so we can test it with official release.
hey @ulixius9 any word on upstreaming the changes to OL connector you mentioned?