[Bug] java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
paimon version:1.0.0
Compute Engine
Flink version:1.19.1
Minimal reproduce step
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: order-sync-paimon2 # flink 集群名称
spec:
image: maven.xiujiadian.com/streampark/flink-kafka-paimon-application:1.19.1
flinkVersion: v1_19 # flink版本,选择1.18
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4" # 每个taskmanager提供的slot数
#high-availability.type: kubernetes
#high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HA
#high-availability.storageDir: file:///opt/flink/flink_recovery # JobManager HA数据保存路径
#job.autoscaler.enabled: "true" # 开启autoscaling
#job.autoscaler.stabilization.interval: 1m #稳定窗口。在此期间,不会收集任何指标,也不会采取任何缩放操作
#job.autoscaler.metrics.window: 2m #指标采集时间窗口
#job.autoscaler.target.utilization: "0.6" #目标利用率,扩缩容后尽量确保作业或chained operator groups的利用率在此值以下
#job.autoscaler.target.utilization.boundary: "0.1" #利用率边界,避免负载波动时立即扩缩容。0.1操作之前,允许与目标利用率有10%的偏差
#job.autoscaler.restart.time: 1m #重新启动应用程序需要的时间
#job.autoscaler.catch-up.duration: 2m #扩容后,作业有望赶上的时间
#pipeline.max-parallelism: "6" # 扩容到的最大并行度
jobmanager.archive.fs.dir: file:///opt/flink/flink_history # JobManager 归档路径
historyserver.archive.fs.dir: file:///opt/flink/flink_history # Historyserver 归档路径
historyserver.archive.fs.refresh-interval: "10000" # Historyserver 文件刷新间隔
taskmanager.memory.managed.size: 128mb
execution.checkpointing.interval: "300000"
execution.checkpointing.mode: "EXACTLY_ONCE"
execution.checkpointing.externalized-checkpoint-retention: "RETAIN_ON_CANCELLATION"
execution.checkpointing.max-concurrent-checkpoints: "2"
execution.checkpointing.timeout: "600000"
state.backend.type: "filesystem"
state.checkpoint-storage: "filesystem"
state.checkpoints.dir: "file:///opt/flink/checkpoints/paimon/zmn_bigdata_order_paimon/big-data-flink-consumer-orderSyncPaimonApp2"
kubernetes.hadoop.conf.config-map.name: apache-hadoop-conf
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "4096m"
cpu: 1
podTemplate:
spec:
imagePullSecrets:
- name: image-pull-secret
containers:
- name: flink-main-container
env:
- name: TZ
value: Asia/Shanghai
volumeMounts:
- name: flink-jar # 挂载nfs上的jar
mountPath: /opt/flink/jar
- name: flink-log # 挂载日志 pvc
mountPath: /opt/flink/log
- name: flink-checkpoints # 挂载 checkpoint
mountPath: /opt/flink/checkpoints
- name: flink-ha # HA pvc配置
mountPath: /opt/flink/flink_recovery
- name: flink-historyserver
mountPath: /opt/flink/flink_history
volumes:
- name: flink-jar
persistentVolumeClaim:
claimName: flink-jar-pvc
- name: flink-log
persistentVolumeClaim:
claimName: flink-log-pvc
- name: flink-checkpoints
persistentVolumeClaim:
claimName: flink-checkpoint-pvc
- name: flink-ha
persistentVolumeClaim:
claimName: flink-ha-pvc
- name: flink-historyserver
persistentVolumeClaim:
claimName: flink-historyserver-pvc
job:
jarURI: local:///opt/flink/jar/paimon-flink-action-1.0.0.jar
entryClass: org.apache.paimon.flink.action.FlinkActions
args:
- "kafka_sync_database"
- "--warehouse"
- "hdfs:///user/hive/warehouse"
- "--database"
- "ods3"
- "--table_prefix"
- "ods_"
# 多数据库前缀映射
- "--table_prefix_db"
- "biz_serv_work_after_delivery=ods_biz_serv_work_after_delivery_"
- "--table_prefix_db"
- "biz_serv_work_delivery=ods_biz_serv_work_delivery_"
- "--table_prefix_db"
- "biz_serv_work_distribute=ods_biz_serv_work_distribute_"
- "--table_prefix_db"
- "biz_serv_work_general=ods_biz_serv_work_general_"
- "--table_prefix_db"
- "biz_serv_work_general_agg=ods_biz_serv_work_general_agg_"
- "--table_prefix_db"
- "biz_serv_work_pre_delivery=ods_biz_serv_work_pre_delivery_"
- "--table_prefix_db"
- "biz_serv_order_pay=ods_biz_serv_order_pay_"
- "--partition_keys"
- "region"
- "--type_mapping"
- "tinyint1-not-bool,bigint-unsigned-to-bigint"
# Kafka 安全配置
- "--kafka_conf"
- "properties.bootstrap.servers=xxx:9092,xxx:9092,xxx:9092"
- "--kafka_conf"
- "properties.security.protocol=SASL_PLAINTEXT"
- "--kafka_conf"
- "properties.sasl.mechanism=PLAIN"
- "--kafka_conf"
- 'properties.sasl.jaas.config=org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-admin" password="xxx&cxxxTv$d";'
- "--kafka_conf"
- "topic=zmn_bigdata_order_format"
- "--kafka_conf"
- "properties.group.id=big-data-flink-consumer-orderSyncPaimonApp2"
- "--kafka_conf"
- "scan.startup.mode=latest-offset"
- "--kafka_conf"
- "value.format=canal-json"
# Hive Catalog 配置
- "--catalog_conf"
- "metastore=hive"
- "--catalog_conf"
- "uri=thrift://xxxx:9083"
# 表级配置
- "--table_conf"
- "bucket=4"
- "--table_conf"
- "changelog-producer=input"
- "--table_conf"
- "sink.parallelism=4"
- "--table_conf"
- "write-buffer-size=256mb"
- "--table_conf"
- "write-buffer-spillable=true"
# 包含的数据库
- "--including_dbs"
- "biz_serv_work_after_delivery|biz_serv_work_delivery|biz_serv_work_distribute|biz_serv_work_general|biz_serv_work_general_agg|biz_serv_work_pre_delivery|biz_serv_order_pay"
parallelism: 1
upgradeMode: last-state
What doesn't meet your expectations?
2025-04-24 15:29:51
java.io.IOException: java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.
at org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator.processElement(CdcRecordStoreMultiWriteOperator.java:189)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.
at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182)
at org.apache.paimon.mergetree.Levels.
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!