[Feature] Flink cdc paimon ingestion recovery issue
Search before asking
- [x] I searched in the issues and found nothing similar.
Motivation
I face some issue when loading a big database from mysql to paimon just using one flink paimon cdc ingestion job. If flink ingestion job encounters an exception triggered by loading one of the tables in the database, then the whole flink job will stop and affect the ingestion work of the other table ingestion process. If I encounter the exception which requires me to restart the job without initial savepoint to solve it, is there any configuration to avoid initial loading all table in the job configured and just initial loading the table which trigger the exception?
Solution
No Solution
Anything else?
Using paimon 0.9 and flink1.20
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Hi @kwokhh ,Thanks for your issue!Would you like to provide more information for this problem? For example, flink code, logs,env and so on.
Hi @yangjf2019 the flink deployment is described below:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: <name>
spec:
image: <name>
mode: native
flinkVersion: v1_17
flinkConfiguration:
taskmanager.memory.jvm-metaspace.size: 512 mb
jobmanager.memory.jvm-metaspace.size: 512 mb
metrics.job.status.enable: "STATE"
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
taskmanager.numberOfTaskSlots: "4"
taskmanager.memory.managed.fraction: "0.1"
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
state.savepoints.dir: <dir>
state.checkpoints.dir: <dir>
high-availability.storageDir: <dir>
kubernetes.hadoop.conf.config-map.name: <name>
kubernetes.operator.periodic.savepoint.interval: 30 min
kubernetes.operator.savepoint.history.max.age: 14 d
kubernetes.operator.savepoint.history.max.count: "5"
kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age: 30 min
execution.checkpointing.unaligned.enabled: "false"
execution.checkpointing.tolerable-failed-checkpoints: "100"
execution.checkpointing.interval: 15 min
execution.checkpointing.min-pause: 2 min
execution.checkpointing.timeout: 10 min
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: "2147483647"
restart-strategy.fixed-delay.delay: 120 s
state.checkpoints.num-retained: "10"
pipeline.object-reuse: "true"
akka.ask.timeout: 10m
client.timeout: 15 min
e
akka.framesize: 41943040b
table.local-time-zone: <region>
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
imagePullSecrets:
- name: <name>
containers:
- name: flink-main-container
ports:
- name: metrics
containerPort: 9249
protocol: TCP
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "32G"
cpu: 4
taskManager:
replicas: 2
resource:
memory: "32G"
cpu: 4
restartNonce: 0
job:
jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar
entryClass: "org.apache.paimon.flink.action.FlinkActions"
#
args: [
"mysql_sync_database",
"--warehouse", "<link>",
"--database" , "<db>",
"--mysql_conf", "hostname=<hostname>",
"--mysql_conf", "port=<port>",
"--mysql_conf", "username=<name>",
"--mysql_conf", "password=<password>",
"--mysql_conf", "database-name=<name>",
"--mysql_conf", "server-id=<id>",
"--mysql_conf", "server-time-zone=<zone>",
"--including_tables", "<tables>",
"--catalog_conf", "metastore=filesystem",
"--catalog_conf", "case-sensitive=false",
"--table_conf", "bucket=4",
"--table_conf", "changelog-producer=input"
]
# parallelism: 2
upgradeMode: last-state
allowNonRestoredState: true
state: running
savepointTriggerNonce: 1
ingress:
template: "<link>"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
logConfiguration:
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = <name>
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
Thanks for your information! and from the information provided in the yaml, I see that the versions of flink and paimon are not the ones that are running with errors.
flinkVersion: v1_17
jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar