paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] Apache Paimon schema evolution

Open klyashko opened this issue 11 months ago • 4 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Paimon version

Not sure that it's a bag, but wanted to clarify the question of schema evolution.

Currently we are onboarding Paimon in way:

  1. There is a Flink application which injects data to Paimon tables
  2. There are readers which consumes the data later

During experimenting with schema evolution noticed that when schema change was done in external way Flink application didn't pick up changed properties even after it was restarted. For application to pick up the change I had to restart job without its state. What riese a question how to properly manage the schema evolution, should that be imbedded to the application as pre job execution steps or should that be managed somehow else?

Compute Engine

Flink

Minimal reproduce step

  1. Write to Paimon table from stream application in way:
    val table = env.sqlQuery(query)
    val data = env.toChangelogStream(table)

    val catalog = FlinkCatalogFactory.createPaimonCatalog(...)
    val pTable = catalog.getTable(Identifier.create(..., ...))

    new FlinkSinkBuilder(pTable)
      .forRow(data, table.getSchema.toRowDataType)
      .parallelism(1)
      .build()

Usage of HiveCatalog didn't change the behaviour.

  1. Change Paimon table property with external Java application based on Catalog API documentation section

What doesn't meet your expectations?

Absence of understanding after consulting with documentation how safely evolve table schema. In our case it was attempt to change snapshot.time-retained. And preferably do it without losing job state

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

klyashko avatar Jan 30 '25 15:01 klyashko

In our scenarios, we use alter table set sql to change consumer.expiration-time without restarting jobs and make it take effect. Notablely, paimon version: 0.9.0

liyubin117 avatar Feb 20 '25 12:02 liyubin117

@liyubin117 thanks for your reply. Do you do anything extra for change to take an effect? Or how much time need to pass for job to notice the change?

klyashko avatar Feb 21 '25 07:02 klyashko

@klyashko We have a table that has been delayed by the downstream task consumer, resulting in 500,000 files. After using alter table set without extra changes, some files expire after each checkpoint, finally down to 5000 files.

liyubin117 avatar Feb 23 '25 13:02 liyubin117

table.getSchema.toRowDataType run on jobmanager,so you need restart flink,try this :

RichCdcSinkBuilder

see : https://paimon.apache.org/docs/master/program-api/flink-api/#cdc-ingestion-table

dahai1996 avatar Mar 04 '25 04:03 dahai1996