iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

flink:FlinkSink support dynamically changed schema

Open naisongwen opened this issue 3 years ago • 28 comments

Now,FlinkSink requires developers to transfer the schema parameter to build DataStream, which means once the schema given,then the TableSchema will be determinded, canot be changed for ever ,but in practical scenarios,some data format,for example ,JSON,the tranformed schema is unfixed with field added,deleted or renamed and so on,so we want to change the mapped TableSchema while the DataStream is running.

naisongwen avatar Feb 22 '22 09:02 naisongwen

I also met this problem in CDC scenarios

Shane-Yu avatar Mar 02 '22 13:03 Shane-Yu

To summarize, your expectation is to dynamically update Iceberg table schema based on the schema of the data flow when the schema of the data flow does not match the Iceberg table?

hililiwei avatar Mar 25 '22 08:03 hililiwei

I am not sure this would the universally desired behavior. if data stream contains incompatible schema change (like removing a required field), it will break the downstream consumers.

there is a value of automatically syncing input data schema to Iceberg table schema (for compatible schema evolution). Personally, I would like to keep it at the control plane, which would be more natural if there is a schema registry for tracking input data schema change. Control plane can then update Iceberg table schema and restart the Flink job to pick up new Iceberg table schema for write path.

It is tricky to support in automatic schema sync in the data plane. There would be parallel Iceberg writers (like hundreds) for a single sink table. Coordinating metadata (like schema) change is very tricky.

stevenzwu avatar Mar 25 '22 19:03 stevenzwu

right

naisongwen avatar Mar 30 '22 06:03 naisongwen

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream<RowData> to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

lintingbin avatar May 20 '22 04:05 lintingbin

I think we should take this feature seriously. In fact, in the Flink CDC, HUDI already supports dynamic table schema changes without restarting tasks. It captures schema changes to the table and updates it in real time.

When synchronizing data using flink CDC, it is unacceptable to restart a task if the table schema changes.

hililiwei avatar Jun 21 '22 03:06 hililiwei

We have internally implemented modifying columns, adding columns after the last column, and deleting the last column without restarting the flink program. Our processing logic is as follows: DataStream<Map<String, String>> -> map -> DataStream<RowData> -> FlinkSink. In the implementation of map, we will refresh the table schema to generate the latest RowData after each checkpoint is done. At the same time, we have also modified the implementation of FlinkSink. Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file. Is anyone interested in this feature? I can contribute our modifications to FlinkSink if needed.

lintingbin avatar Aug 02 '22 09:08 lintingbin

I personally look forward to seeing your PR.

hililiwei avatar Aug 03 '22 01:08 hililiwei

@lintingbin2009 it might be helpful to describe the solution at high-level design in this issue or some doc.

Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file

This sounds expensive and may not work well at scale. if every writer task needs to poll table for every file, it can create a lot of load on the Iceberg metadata system. Ideally, the table schema polling and change should done by operator coordinator.

stevenzwu avatar Aug 03 '22 04:08 stevenzwu

@hililiwei @stevenzwu https://github.com/apache/iceberg/pull/5425 This is my PR. Hope to have some suggestions. Now we test in an environment with a parallelism of about 40. The checkpoint time is the same as before the dynamic refresh schema is not added.

lintingbin avatar Aug 03 '22 06:08 lintingbin

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

I have interesting on this, how to contact with u about dynamical schema

leichangqing avatar Mar 10 '23 01:03 leichangqing

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

lintingbin avatar Mar 10 '23 04:03 lintingbin

It will be a good starting point if someone likes to create a design doc on how to solve this problem in a general and scalable way

stevenzwu avatar Mar 10 '23 04:03 stevenzwu

@stevenzwu This is a doc I wrote, you can give your opinion, and I will modify it.

lintingbin avatar Mar 15 '23 06:03 lintingbin

Is there any news on this?

FranMorilloAWS avatar Feb 28 '24 16:02 FranMorilloAWS

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

Can there be sample code to demonstrate how to use it?

Ruees avatar Apr 16 '24 03:04 Ruees

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

I tried to pull the Flinksink related modification code for the first commit and added a column at the end using Java API in the map operator, but the result was not successful. Even after inserting the data successfully, the column at the end was still empty

Ruees avatar Apr 16 '24 09:04 Ruees

Just commenting for visibility that this feature would be extremely useful for our use case too. It's similar to CDC use case but instead driven by the services emitting events. I'd also be happy to lend a hand, but at the moment, it's not clear what the state is. Is the proposed design agreed upon, or does it need re-iteration?

lkokhreidze avatar Apr 17 '24 10:04 lkokhreidze

I think it is not trivial to implement this feature, as the schema of the RowData objects which are the input of the Sink is finalized when the job graph is created. To change the schema one need to regenerate the job graph, essentially restarting the job (calling the main method). There might be some way to work around this, by changing the input to records where the schema is embedded to the records (performance loss), or getting the schema from an outside source (additional external depenency), but this would need some deeper changes in the Sink. Also care should be taken, how to synchronize the table schema refresh throughout the tasks when the changes are detected...

As a workaround, we created our own schema check before converting the input to RowData, and throw a SuppressRestartsException when changes are detected. We used Flink Kubernetes Operator to restart the job from failed state, using kubernetes.operator.job.restart.failed. The main method refreshes the table and the new job instance is started with the new schema.

pvary avatar Apr 17 '24 17:04 pvary

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Oct 15 '24 00:10 github-actions[bot]

How does flink-cdc do it?

ottomata avatar Oct 15 '24 12:10 ottomata

It sends the schema along with every record. I'm playing around with a somewhat similar, but more performant solution, where we send only the schemaId instead of the full schema. The thing is only in infancy atm, but... 😄

pvary avatar Oct 17 '24 20:10 pvary

Ah, thanks!

FWIW, I think schema evolution support is worth the tradeoff of extra bytes per record :)

ottomata avatar Oct 17 '24 21:10 ottomata

The current tradeoff is more like doubled CPU time (we need caching and an extra serialization/deserialization step, which is on an already well optimized hot path). We are still looking for ways to optimize this.

pvary avatar Oct 18 '24 05:10 pvary

How is Paimon doing it? Same as Flink - CDC? How bout integrating with a schema registry and we use the schemaversionid

FranMorilloAWS avatar Oct 18 '24 06:10 FranMorilloAWS

The Iceberg table could be used as a schema registry. I would be reluctant to add any new requirements if possible

pvary avatar Oct 21 '24 04:10 pvary

How would that look? So normally we consume from Kafka or Kinesis and use glue schema registry or confluent schema registry. As of now the Sink has the option of using Generic Record. Is there any roadmap for Specific?

FranMorilloAWS avatar Oct 21 '24 07:10 FranMorilloAWS

If schema registry support is considered, please make it pluggable! Wikimedia Foundation uses JSONSchema and $schema URIs. And we can do it with Flink.

ottomata avatar Oct 21 '24 11:10 ottomata

If the schema is changed, then the target Iceberg table needs to be updated to the new schema anyways. So we can use the Iceberg schemaId to send along the records.

pvary avatar Oct 22 '24 12:10 pvary

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Apr 21 '25 00:04 github-actions[bot]