[Bug] Apache Paimon schema evolution
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
Not sure that it's a bag, but wanted to clarify the question of schema evolution.
Currently we are onboarding Paimon in way:
- There is a Flink application which injects data to Paimon tables
- There are readers which consumes the data later
During experimenting with schema evolution noticed that when schema change was done in external way Flink application didn't pick up changed properties even after it was restarted. For application to pick up the change I had to restart job without its state. What riese a question how to properly manage the schema evolution, should that be imbedded to the application as pre job execution steps or should that be managed somehow else?
Compute Engine
Flink
Minimal reproduce step
- Write to Paimon table from stream application in way:
val table = env.sqlQuery(query)
val data = env.toChangelogStream(table)
val catalog = FlinkCatalogFactory.createPaimonCatalog(...)
val pTable = catalog.getTable(Identifier.create(..., ...))
new FlinkSinkBuilder(pTable)
.forRow(data, table.getSchema.toRowDataType)
.parallelism(1)
.build()
Usage of HiveCatalog didn't change the behaviour.
- Change Paimon table property with external Java application based on Catalog API documentation section
What doesn't meet your expectations?
Absence of understanding after consulting with documentation how safely evolve table schema. In our case it was attempt to change snapshot.time-retained. And preferably do it without losing job state
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
In our scenarios, we use alter table set sql to change consumer.expiration-time without restarting jobs and make it take effect.
Notablely, paimon version: 0.9.0
@liyubin117 thanks for your reply. Do you do anything extra for change to take an effect? Or how much time need to pass for job to notice the change?
@klyashko We have a table that has been delayed by the downstream task consumer, resulting in 500,000 files. After using alter table set without extra changes, some files expire after each checkpoint, finally down to 5000 files.
table.getSchema.toRowDataType run on jobmanager,so you need restart flink,try this :
RichCdcSinkBuilder
see : https://paimon.apache.org/docs/master/program-api/flink-api/#cdc-ingestion-table