paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Feature] Flink cdc paimon ingestion recovery issue

Open kwokhh opened this issue 11 months ago • 3 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Motivation

I face some issue when loading a big database from mysql to paimon just using one flink paimon cdc ingestion job. If flink ingestion job encounters an exception triggered by loading one of the tables in the database, then the whole flink job will stop and affect the ingestion work of the other table ingestion process. If I encounter the exception which requires me to restart the job without initial savepoint to solve it, is there any configuration to avoid initial loading all table in the job configured and just initial loading the table which trigger the exception?

Solution

No Solution

Anything else?

Using paimon 0.9 and flink1.20

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

kwokhh avatar Jan 21 '25 03:01 kwokhh

Hi @kwokhh ,Thanks for your issue!Would you like to provide more information for this problem? For example, flink code, logs,env and so on.

yangjf2019 avatar Jan 22 '25 02:01 yangjf2019

Hi @yangjf2019 the flink deployment is described below:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: <name>
spec:
  image: <name>
  
  mode: native
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.memory.jvm-metaspace.size: 512 mb
    jobmanager.memory.jvm-metaspace.size: 512 mb
    metrics.job.status.enable: "STATE"
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    taskmanager.numberOfTaskSlots: "4"
    taskmanager.memory.managed.fraction: "0.1"
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory


    state.savepoints.dir: <dir>
    state.checkpoints.dir: <dir>
    high-availability.storageDir: <dir>


    kubernetes.hadoop.conf.config-map.name: <name>


    kubernetes.operator.periodic.savepoint.interval: 30 min
    kubernetes.operator.savepoint.history.max.age: 14 d
    kubernetes.operator.savepoint.history.max.count: "5"
    kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age: 30 min


    execution.checkpointing.unaligned.enabled: "false"
    execution.checkpointing.tolerable-failed-checkpoints: "100"
    execution.checkpointing.interval: 15 min
    execution.checkpointing.min-pause: 2 min
    execution.checkpointing.timeout: 10 min
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: "2147483647"
    restart-strategy.fixed-delay.delay: 120 s


    state.checkpoints.num-retained: "10"


    pipeline.object-reuse: "true"


    akka.ask.timeout: 10m


    client.timeout: 15 min
e
    akka.framesize: 41943040b


    table.local-time-zone: <region>


  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      imagePullSecrets:
        - name: <name>
      containers:
        - name: flink-main-container
          ports:
            - name: metrics
              containerPort: 9249
              protocol: TCP
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "32G"
      cpu: 4
  taskManager:
    replicas: 2
    resource:
      memory: "32G"
      cpu: 4
  restartNonce: 0
  job:
    jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar
    entryClass: "org.apache.paimon.flink.action.FlinkActions"


#
    args: [
        "mysql_sync_database",
        "--warehouse", "<link>",
        "--database" , "<db>",
        "--mysql_conf", "hostname=<hostname>",
        "--mysql_conf", "port=<port>",
        "--mysql_conf", "username=<name>",
        "--mysql_conf", "password=<password>",
        "--mysql_conf", "database-name=<name>",
        "--mysql_conf", "server-id=<id>",
        "--mysql_conf", "server-time-zone=<zone>",
        "--including_tables", "<tables>",
        "--catalog_conf", "metastore=filesystem",
        "--catalog_conf", "case-sensitive=false",
        "--table_conf", "bucket=4",
        "--table_conf", "changelog-producer=input"
    ]
#      parallelism: 2
    upgradeMode: last-state
    allowNonRestoredState: true
    state: running 
    savepointTriggerNonce: 1


  ingress:
    template: "<link>"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  logConfiguration:
    log4j-console.properties: |+
      # This affects logging for both user code and Flink
      rootLogger.level = INFO
      rootLogger.appenderRef.console.ref = ConsoleAppender
      rootLogger.appenderRef.rolling.ref = RollingFileAppender






      logger.akka.name = <name>
      logger.akka.level = INFO
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = INFO
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = INFO
      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = INFO


      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n


      # Log all infos in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10




      logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
      logger.netty.level = OFF

kwokhh avatar Feb 11 '25 08:02 kwokhh

Thanks for your information! and from the information provided in the yaml, I see that the versions of flink and paimon are not the ones that are running with errors.

 flinkVersion: v1_17

 jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar
Image

yangjf2019 avatar Feb 12 '25 02:02 yangjf2019