[Bug] CDC MySQL data to Paimon using PaimonAction. Exception: AvroRuntimeException: Unrecognized codec: snappy
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
Paimon(on oss): 1.0.1
Compute Engine
Flink: 1.20.1
Minimal reproduce step
The cdc data binlog was interrupted abnormally at night. After restarting the task the next morning (new task, not restarting from checkpoints/savepoints, because the binlog file was lost), the data could not be written. The exception was always thrown: org.apache.paimon.shade.org.apache.avro.AvroRuntimeException: Unrecognized codec: snappy
What doesn't meet your expectations?
2025-04-03 11:31:23
java.io.IOException: org.apache.paimon.shade.org.apache.avro.AvroRuntimeException: Unrecognized codec: snappy
at org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator.processElement(CdcRecordStoreMultiWriteOperator.java:171)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.paimon.shade.org.apache.avro.AvroRuntimeException: Unrecognized codec: snappy
at org.apache.paimon.shade.org.apache.avro.file.CodecFactory.fromString(CodecFactory.java:159)
at org.apache.paimon.shade.org.apache.avro.file.DataFileStream.resolveCodec(DataFileStream.java:158)
at org.apache.paimon.shade.org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:144)
at org.apache.paimon.shade.org.apache.avro.file.DataFileReader.
Anything else?
When CDC synchronizes data, other tasks also write data. This may be the reason for the exception.
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Please check Flink webui dashboard whether snappy-java-x.jar exists, maybe this is a clue
Please check Flink webui dashboard whether
snappy-java-x.jarexists, maybe this is a clue
It is invalid. I checked from the web-ui and there is no snappy-java-xxx.jar file. My other tasks run normally without this file. I tried to put snappy-java-xxx.jar in the /opt/flink/lib directory, but the same exception was still reported when restarting the task.
It is weird, I found the jar in the dashboard of paimon 0.9. Could you please paste the detailed sql to help checking?
It is weird, I found the jar in the dashboard of paimon 0.9. Could you please paste the detailed sql to help checking?奇怪的是,我在 paimon 0.9 的仪表板里找到了那个 jar。您能否粘贴详细的 sql 以帮助检查?
我的完整配置:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: fa-paimon-action-platform
spec:
image: dockerhub.aaaaaaaaaaaaaaaaaaa.com/local/flink:1.20.1-java17
flinkVersion: v1_20
imagePullPolicy: IfNotPresent
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
fs.oss.endpoint: "oss-cn-hangzhou.aliyuncs.com"
fs.oss.accessKeyId: "-----------------------"
fs.oss.accessKeySecret: "-------------------------"
# 状态后端配置
state.backend: rocksdb
execution.checkpointing.dir: "oss://flink-cluster-k8s/checkpoints/"
execution.checkpointing.savepoint-dir: "oss://flink-cluster-k8s/savepoints/"
# 高可用配置
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: "oss://flink-cluster-k8s/ha/"
# RocksDB 优化 (CDC大表性能优化)
state.backend.rocksdb.memory.managed: "true"
state.backend.rocksdb.memory.fixed-per-slot: "256mb"
state.backend.rocksdb.compaction.level.use-dynamic-size: "true"
state.backend.rocksdb.block.cache-size: "256mb"
state.backend.rocksdb.thread.num: "4"
state.backend.rocksdb.writebuffer.size: "64mb"
state.backend.rocksdb.writebuffer.count: "4"
state.backend.rocksdb.compaction.style: "LEVEL"
state.backend.local-recovery: "true"
# 检查点配置
execution.checkpointing.interval: "30000" # checkpoints 间隔时间
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: "120000" # checkpoint执行超时时间,超时则 checkpoint 失败
execution.checkpointing.min-pause: "10000" # checkpoint执行最小间隔时间
execution.checkpointing.max-concurrent-checkpoints: "1" # 允许并行执行 checkpoint个数,当指定了 minPauseBetweenCheckpoints时,其值无效(就是 1)
execution.checkpointing.tolerable-failed-checkpoints: "10" # 允许的 checkpoint 失败次数
# history-server
jobmanager.archive.fs.dir: "oss://-----------/completed-jobs/"
# 失败重启策略
kubernetes.operator.job.restart-failed: "true" # 允许失败后自动重启
kubernetes.operator.job.upgrade.max-attempts: "3" # 重启最大尝试次数
# 性能优化
rest.flamegraph.enabled: "true" # 火焰图
taskmanager.numberOfTaskSlots: "1"
pekko.ask.timeout: 600s # flink 组件通信超时时间(默认 60s)
heartbeat.timeout: "600000" # 增加心跳超时时间(默认50000ms)
jobmanager.scheduler.taskmanager-timeout: "300000" # JobManager检测心跳的间隔
# 性能优化
# taskmanager.memory.task.heap.size: 2g # taskmanager jvm 堆内存
# taskmanager.memory.flink.size: 4g
# 性能优化-I/O优化
# taskmanager.memory.managed.size: "0m"
taskmanager.memory.managed.fraction: "0.2" # 使用了状态建议调大这个值(默认 0.4)
# 性能优化-网络
# taskmanager.memory.network.fraction: "0.1" # 网络内存占用比例
taskmanager.memory.network.min: "128m" # 网络最小内存占用
# taskmanager.network.request-backoff.max: "20000" # 针对大批量数据传输,增加请求超时
# taskmanager.network.memory.buffer-debloat.enabled: "true" # 启用缓冲区动态调整
# taskmanager.network.memory.max-buffer-size: "32mb"
# taskmanager.network.netty.server.numThreads: "4"
# taskmanager.network.netty.client.numThreads: "4"
# CDC 大表性能优化(迷你批处理)
table.exec.mini-batch.enabled: "true" # 开启小批量处理
table.exec.mini-batch.allow-latency: "5s" # 允许的延迟时间
table.exec.mini-batch.size: "5000" # 最大批量大小
env.java.opts.all: "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED"
serviceAccount: flink
job:
autoRecovery: disabled # disabled:不自动恢复、restartFailed:自动重启失败的任务
jobManager:
replicas: 1
resource:
cpu: 0.5
memory: "1024m"
taskManager:
replicas: 1
resource:
cpu: 0.8
memory: 4g
podTemplate:
spec:
containers:
- name: flink-main-container
- name: openvpn-client
image: dockerhub.zanxiangwl.com/openvpn-client:0.0.1
securityContext:
privileged: true
capabilities:
add: ["NET_ADMIN"]
volumeMounts:
- name: openvpn-config
mountPath: /etc/openvpn/config
volumes:
- name: openvpn-config
configMap:
name: openvpn-config
shareProcessNamespace: true
job:
jarURI: local:///opt/flink/lib/paimon-flink-action-1.0.1.jar
entryClass: org.apache.paimon.flink.action.FlinkActions
# initial | latest-offset | timestamp
args:
- mysql_sync_database # 只有主键表才会被同步
- --warehouse
- "oss://--------------/paimon"
- --database
- "ods_platform"
- --mode
- combined # 为所有表启动单个 combined sink,新表会自动同步
- --type_mapping
- to-nullable,char-to-string,bigint-unsigned-to-bigint
- --mysql_conf
- hostname=-------------------------
- --mysql_conf
- username=-----
- --mysql_conf
- password=-----
- --mysql_conf
- database-name=platform-sync
- --mysql_conf
- server-time-zone=Asia/Shanghai
- --mysql_conf
- scan.startup.mode=latest-offset
- --mysql_conf
- server-id=6010-6019
- --catalog_conf
- metastore=filesystem
- --catalog_conf
- warehouse=oss://----------/paimon
- --catalog_conf
- fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com
- --catalog_conf
- fs.oss.accessKeyId=--------------------------
- --catalog_conf
- fs.oss.accessKeySecret=----------------------------
- --table_conf
- bucket=4
- --table_conf
- changelog-producer=input
- --table_conf
- sink.parallelism=4
parallelism: 1
# stateless:无状态模式。更新时任务直接取消,新任务从新开始运行(默认)
# none:?????????不理解
# savepoint:更新前自动触发 savepoint ,等待 savepoint 完成后取消任务,然后从 savepoint 恢复任务
# last-state:在更新前,不会触发 savepoint,新作业会尝试从最后一个可用的 checkpoint 或 savepoint 恢复(比 savepoint 模式更快,但状态可能不如 savepoint 完整)
upgradeMode: last-state
web-ui 的 jar包列表:
这是一个使用 paimon-action 同步 mysql 数据的cdc 任务
Please check Flink webui dashboard whether
snappy-java-x.jarexists, maybe this is a clueIt is invalid. I checked from the web-ui and there is no snappy-java-xxx.jar file. My other tasks run normally without this file. I tried to put snappy-java-xxx.jar in the /opt/flink/lib directory, but the same exception was still reported when restarting the task.
Hi, Could you please confirm whether this error can be consistently reproduced upon every restart?
If it's an intermittent occurrence, we've encountered similar issues in our production environment before. Our root cause analysis at that time identified insufficient local disk space on the TaskManager as the underlying issue. We traced the following logs from the TaskManager:
org.xerial.snappy.SnappyLoader#loadNativeLibrary This class is responsible for dynamically loading Snappy's native compression library during runtime, I recommend checking whether the directory specified by "org.xerial.snappy.tempdir" is prone to repeated disk space exhaustion.