flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug] drop column之后进行新增或删除数据报错

Open kongkongstar opened this issue 2 years ago • 0 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

1.18

Flink CDC version

3.0.0

Database and its version

starrocks 2.5.8

Minimal reproduce step

在删除列之后,立马新增或删除数据。 2024-01-22 17:36:22,556 INFO com.starrocks.connector.flink.catalog.StarRocksCatalog [] - Success to drop columns from user.tb_user, duration: 27261ms, sql: ALTER TABLE user.tb_user DROP COLUMN price PROPERTIES ("timeout" = "1800"); 2024-01-22 17:36:22,572 INFO com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier [] - Successful to apply drop column, event: DropColumnEvent{tableId=user.tb_user, droppedColumns=[price BIGINT]} 2024-01-22 17:36:44,638 INFO org.apache.flink.runtime.xexecutiongraph.ExecutionGraph [] - PostPartition -> Sink Writer: StarRocks Sink -> Sink Committer: StarRocks Sink (1/1) (9b0c5f9677364d623c9b394f4bb350f9_0deb1b26a3d9eb3c8f0c11f7110b2903_0_0) switched from RUNNING to FAILED on localhost:44944-2b3027 @ localhost (dataPort=41166). java.lang.IllegalArgumentException: null at com.ververica.cdc.common.utils.Preconditions.checkArgument(Preconditions.java:106) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serializeRecord(EventRecordSerializationSchema.java:131) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.applyDataChangeEvent(EventRecordSerializationSchema.java:119) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serialize(EventRecordSerializationSchema.java:78) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serialize(EventRecordSerializationSchema.java:45) ~[?:?] at com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.write(StarRocksWriter.java:139) ~[?:?] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) ~[flink-dist-1.18.0.jar:1.18.0] at com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:154) ~[?:?] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.0.jar:1.18.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]

checkpoint状态恢复上一状态后

2024-01-22 17:36:46,406 ERROR com.starrocks.connector.flink.catalog.StarRocksCatalog [] - Failed to drop columns from user.tb_user, sql: ALTER TABLE user.tb_user DROP COLUMN price PROPERTIES ("timeout" = "1800"); 2024-01-22 17:36:46,424 INFO com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier [] - Successful to apply drop column, event: DropColumnEvent{tableId=user.tb_user, droppedColumns=[price BIGINT]}, and ignore the alter exception com.starrocks.connector.flink.catalog.StarRocksCatalogException: Failed to drop columns from user.tb_user at com.starrocks.connector.flink.catalog.StarRocksCatalog.alterDropColumns(StarRocksCatalog.java:331) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyDropColumn(StarRocksMetadataApplier.java:197) ~[?:?] at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:75) ~[?:?] at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82) ~[?:?] at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149) ~[?:?] at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123) ~[?:?] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.18.0.jar:1.18.0]

What did you expect to see?

不应该报错,程序应该正常运行

What did you see instead?

image

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

kongkongstar avatar Jan 22 '24 09:01 kongkongstar