paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.

Open 2018yinjian opened this issue 8 months ago • 1 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Paimon version

paimon version:1.0.0

Compute Engine

Flink version:1.19.1

Minimal reproduce step

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: namespace: flink name: order-sync-paimon2 # flink 集群名称 spec: image: maven.xiujiadian.com/streampark/flink-kafka-paimon-application:1.19.1 flinkVersion: v1_19 # flink版本,选择1.18 imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取 ingress: # ingress配置,用于访问flink web页面 template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: "nginx" annotations: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "4" # 每个taskmanager提供的slot数 #high-availability.type: kubernetes #high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HA #high-availability.storageDir: file:///opt/flink/flink_recovery # JobManager HA数据保存路径 #job.autoscaler.enabled: "true" # 开启autoscaling #job.autoscaler.stabilization.interval: 1m #稳定窗口。在此期间,不会收集任何指标,也不会采取任何缩放操作 #job.autoscaler.metrics.window: 2m #指标采集时间窗口 #job.autoscaler.target.utilization: "0.6" #目标利用率,扩缩容后尽量确保作业或chained operator groups的利用率在此值以下 #job.autoscaler.target.utilization.boundary: "0.1" #利用率边界,避免负载波动时立即扩缩容。0.1操作之前,允许与目标利用率有10%的偏差 #job.autoscaler.restart.time: 1m #重新启动应用程序需要的时间 #job.autoscaler.catch-up.duration: 2m #扩容后,作业有望赶上的时间 #pipeline.max-parallelism: "6" # 扩容到的最大并行度 jobmanager.archive.fs.dir: file:///opt/flink/flink_history # JobManager 归档路径 historyserver.archive.fs.dir: file:///opt/flink/flink_history # Historyserver 归档路径 historyserver.archive.fs.refresh-interval: "10000" # Historyserver 文件刷新间隔 taskmanager.memory.managed.size: 128mb execution.checkpointing.interval: "300000" execution.checkpointing.mode: "EXACTLY_ONCE" execution.checkpointing.externalized-checkpoint-retention: "RETAIN_ON_CANCELLATION" execution.checkpointing.max-concurrent-checkpoints: "2" execution.checkpointing.timeout: "600000" state.backend.type: "filesystem" state.checkpoint-storage: "filesystem" state.checkpoints.dir: "file:///opt/flink/checkpoints/paimon/zmn_bigdata_order_paimon/big-data-flink-consumer-orderSyncPaimonApp2" kubernetes.hadoop.conf.config-map.name: apache-hadoop-conf serviceAccount: flink jobManager: replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "4096m" cpu: 1 podTemplate: spec: imagePullSecrets: - name: image-pull-secret containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai volumeMounts: - name: flink-jar # 挂载nfs上的jar mountPath: /opt/flink/jar - name: flink-log # 挂载日志 pvc mountPath: /opt/flink/log - name: flink-checkpoints # 挂载 checkpoint mountPath: /opt/flink/checkpoints - name: flink-ha # HA pvc配置 mountPath: /opt/flink/flink_recovery - name: flink-historyserver mountPath: /opt/flink/flink_history volumes: - name: flink-jar persistentVolumeClaim: claimName: flink-jar-pvc - name: flink-log persistentVolumeClaim: claimName: flink-log-pvc - name: flink-checkpoints persistentVolumeClaim: claimName: flink-checkpoint-pvc - name: flink-ha persistentVolumeClaim: claimName: flink-ha-pvc
- name: flink-historyserver persistentVolumeClaim: claimName: flink-historyserver-pvc
job: jarURI: local:///opt/flink/jar/paimon-flink-action-1.0.0.jar entryClass: org.apache.paimon.flink.action.FlinkActions args: - "kafka_sync_database" - "--warehouse" - "hdfs:///user/hive/warehouse" - "--database" - "ods3" - "--table_prefix" - "ods_" # 多数据库前缀映射 - "--table_prefix_db" - "biz_serv_work_after_delivery=ods_biz_serv_work_after_delivery_" - "--table_prefix_db" - "biz_serv_work_delivery=ods_biz_serv_work_delivery_" - "--table_prefix_db" - "biz_serv_work_distribute=ods_biz_serv_work_distribute_" - "--table_prefix_db" - "biz_serv_work_general=ods_biz_serv_work_general_" - "--table_prefix_db" - "biz_serv_work_general_agg=ods_biz_serv_work_general_agg_" - "--table_prefix_db" - "biz_serv_work_pre_delivery=ods_biz_serv_work_pre_delivery_" - "--table_prefix_db" - "biz_serv_order_pay=ods_biz_serv_order_pay_" - "--partition_keys" - "region" - "--type_mapping" - "tinyint1-not-bool,bigint-unsigned-to-bigint" # Kafka 安全配置 - "--kafka_conf" - "properties.bootstrap.servers=xxx:9092,xxx:9092,xxx:9092" - "--kafka_conf" - "properties.security.protocol=SASL_PLAINTEXT" - "--kafka_conf" - "properties.sasl.mechanism=PLAIN" - "--kafka_conf" - 'properties.sasl.jaas.config=org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-admin" password="xxx&cxxxTv$d";' - "--kafka_conf" - "topic=zmn_bigdata_order_format" - "--kafka_conf" - "properties.group.id=big-data-flink-consumer-orderSyncPaimonApp2" - "--kafka_conf" - "scan.startup.mode=latest-offset" - "--kafka_conf" - "value.format=canal-json" # Hive Catalog 配置 - "--catalog_conf" - "metastore=hive" - "--catalog_conf" - "uri=thrift://xxxx:9083" # 表级配置 - "--table_conf" - "bucket=4" - "--table_conf" - "changelog-producer=input" - "--table_conf" - "sink.parallelism=4" - "--table_conf" - "write-buffer-size=256mb" - "--table_conf" - "write-buffer-spillable=true" # 包含的数据库 - "--including_dbs" - "biz_serv_work_after_delivery|biz_serv_work_delivery|biz_serv_work_distribute|biz_serv_work_general|biz_serv_work_general_agg|biz_serv_work_pre_delivery|biz_serv_order_pay" parallelism: 1 upgradeMode: last-state

What doesn't meet your expectations?

2025-04-24 15:29:51 java.io.IOException: java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected. at org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator.processElement(CdcRecordStoreMultiWriteOperator.java:189) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected. at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) at org.apache.paimon.mergetree.Levels.(Levels.java:83) at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:198) at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95) at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445) at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407) at java.base/java.util.HashMap.computeIfAbsent(Unknown Source) at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406) at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157) at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:175) at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:189) at org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator.processElement(CdcRecordStoreMultiWriteOperator.java:187) ... 13 more

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

2018yinjian avatar Apr 24 '25 07:04 2018yinjian