flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug] [StarRocks] [cdc master branch] Unsupported CDC data type BYTES/VARBINARY

Open everhopingandwaiting opened this issue 1 year ago • 8 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

Flink 1.18

Flink CDC version

3.0.1 master

Database and its version

mysql 5.7.44 StarRocks 3.2.2

Minimal reproduce step

To synchronize the entire database using regular expressions, tables: db..*,

2024-01-29 16:32:18
java.lang.UnsupportedOperationException: Unsupported CDC data type BYTES
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:368)
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:236)
	at com.ververica.cdc.common.types.DataTypeDefaultVisitor.visit(DataTypeDefaultVisitor.java:49)
	at com.ververica.cdc.common.types.VarBinaryType.accept(VarBinaryType.java:86)
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType(StarRocksUtils.java:112)
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksTable(StarRocksUtils.java:86)
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyCreateTable(StarRocksMetadataApplier.java:87)
	at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:70)
	at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82)
	at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149)
	at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204)
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121)
	at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1062)
	at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:604)
	at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)

What did you expect to see?

the data synchronization process to proceed smoothly.

What did you see instead?

The data synchronization process has been interrupted and cannot continue.

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

everhopingandwaiting avatar Jan 29 '24 09:01 everhopingandwaiting

alse : java.lang.UnsupportedOperationException: Unsupported CDC data type
VARBINARY

everhopingandwaiting avatar Jan 29 '24 10:01 everhopingandwaiting

Can you offer your create-table sql?

joyCurry30 avatar Jan 31 '24 05:01 joyCurry30

These are all the error messages. I compiled the latest branch of Flink CDC 3.0.x code and used a YAML script to synchronize the entire database. The syntax for synchronizing tables is tables: db.[^qrtz]..* The problem may be due to the existence of fields with the data types longblob or blob in other tables.

everhopingandwaiting avatar Jan 31 '24 06:01 everhopingandwaiting

Could you please share the MySQL and StarRocks 'CREATE TABLE' DDL statements, as well as the associated YAML configuration files? I would like to replicate the issue in my local environment

joyCurry30 avatar Jan 31 '24 07:01 joyCurry30

Could you please share the MySQL and StarRocks 'CREATE TABLE' DDL statements, as well as the associated YAML configuration files? I would like to replicate the issue in my local environment

I will provide the table structures of database and the contents of the YAML configuration file later

everhopingandwaiting avatar Jan 31 '24 07:01 everhopingandwaiting

Hi, could you please share the DDL statements and YAML configuration files?

joyCurry30 avatar Feb 02 '24 02:02 joyCurry30

Hi, could you please share the DDL statements and YAML configuration files?

cdc script config:

source:
  type: mysql
  hostname: 1x
  port: 3306
  username: xxx
  password: xxx
  server-id: 45354-49440
  #tables: db.t\.*,db.sys\.*,db.code\.*,
  tables: db.\.*
  server-time-zone: Asia/Shanghai
  ignore.noprimarykey.table: true
  enable.column.comments: true
  #scan.startup.mode: specific-offset
  scan.startup.specific-offset.file: master-bin.000087
  scan.startup.specific-offset.pos: 1
sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://xxx:9030
  load-url: xxx:8030
  username: xx
  password: xxx
  table.create.properties.replication_num: 1
  table.create.properties.light_schema_change: true
  table.create.properties.fast_schema_evolution: true
  sink.connect.timeout-ms: 3000009
pipeline:
  name: Sync MySQL Database to StarRocks 20 jyao enable.column.comments  ignore.noprimarykey.table  ignore yjxx qrtz  logbin 87 fixed2908
  parallelism: 2
  enable-schema-evolution: true
route:
- source-table: db.[\S]*
  sink-table: db_.s<>
  replace-symbol: s<>
- source-table: db-test.[\S]*
  sink-table: db_.s<>
  replace-symbol: s<>

create table if not exists qrtz_calendars
(
    SCHED_NAME    varchar(120) not null,
    CALENDAR_NAME varchar(190) not null,
    CALENDAR      blob         not null,
    primary key (SCHED_NAME, CALENDAR_NAME)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_fired_triggers
(
    SCHED_NAME        varchar(120) not null,
    ENTRY_ID          varchar(95)  not null,
    TRIGGER_NAME      varchar(190) not null,
    TRIGGER_GROUP     varchar(190) not null,
    INSTANCE_NAME     varchar(190) not null,
    FIRED_TIME        bigint(13)   not null,
    SCHED_TIME        bigint(13)   not null,
    PRIORITY          int          not null,
    STATE             varchar(16)  not null,
    JOB_NAME          varchar(190) null,
    JOB_GROUP         varchar(190) null,
    IS_NONCONCURRENT  varchar(1)   null,
    REQUESTS_RECOVERY varchar(1)   null,
    primary key (SCHED_NAME, ENTRY_ID)
)
    charset = latin1
    row_format = COMPACT;

create index IDX_QRTZ_FT_INST_JOB_REQ_RCVRY
    on qrtz_fired_triggers (SCHED_NAME, INSTANCE_NAME, REQUESTS_RECOVERY);

create index IDX_QRTZ_FT_JG
    on qrtz_fired_triggers (SCHED_NAME, JOB_GROUP);

create index IDX_QRTZ_FT_J_G
    on qrtz_fired_triggers (SCHED_NAME, JOB_NAME, JOB_GROUP);

create index IDX_QRTZ_FT_TG
    on qrtz_fired_triggers (SCHED_NAME, TRIGGER_GROUP);

create index IDX_QRTZ_FT_TRIG_INST_NAME
    on qrtz_fired_triggers (SCHED_NAME, INSTANCE_NAME);

create index IDX_QRTZ_FT_T_G
    on qrtz_fired_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP);

create table if not exists qrtz_job_details
(
    SCHED_NAME        varchar(120) not null,
    JOB_NAME          varchar(190) not null,
    JOB_GROUP         varchar(190) not null,
    DESCRIPTION       varchar(250) null,
    JOB_CLASS_NAME    varchar(250) not null,
    IS_DURABLE        varchar(1)   not null,
    IS_NONCONCURRENT  varchar(1)   not null,
    IS_UPDATE_DATA    varchar(1)   not null,
    REQUESTS_RECOVERY varchar(1)   not null,
    JOB_DATA          blob         null,
    primary key (SCHED_NAME, JOB_NAME, JOB_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create index IDX_QRTZ_J_GRP
    on qrtz_job_details (SCHED_NAME, JOB_GROUP);

create index IDX_QRTZ_J_REQ_RECOVERY
    on qrtz_job_details (SCHED_NAME, REQUESTS_RECOVERY);

create table if not exists qrtz_locks
(
    SCHED_NAME varchar(120) not null,
    LOCK_NAME  varchar(40)  not null,
    primary key (SCHED_NAME, LOCK_NAME)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_paused_trigger_grps
(
    SCHED_NAME    varchar(120) not null,
    TRIGGER_GROUP varchar(190) not null,
    primary key (SCHED_NAME, TRIGGER_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_scheduler_state
(
    SCHED_NAME        varchar(120) not null,
    INSTANCE_NAME     varchar(190) not null,
    LAST_CHECKIN_TIME bigint(13)   not null,
    CHECKIN_INTERVAL  bigint(13)   not null,
    primary key (SCHED_NAME, INSTANCE_NAME)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_triggers
(
    SCHED_NAME     varchar(120) not null,
    TRIGGER_NAME   varchar(190) not null,
    TRIGGER_GROUP  varchar(190) not null,
    JOB_NAME       varchar(190) not null,
    JOB_GROUP      varchar(190) not null,
    DESCRIPTION    varchar(250) null,
    NEXT_FIRE_TIME bigint(13)   null,
    PREV_FIRE_TIME bigint(13)   null,
    PRIORITY       int          null,
    TRIGGER_STATE  varchar(16)  not null,
    TRIGGER_TYPE   varchar(8)   not null,
    START_TIME     bigint(13)   not null,
    END_TIME       bigint(13)   null,
    CALENDAR_NAME  varchar(190) null,
    MISFIRE_INSTR  smallint(2)  null,
    JOB_DATA       blob         null,
    primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
    constraint qrtz_triggers_ibfk_1
        foreign key (SCHED_NAME, JOB_NAME, JOB_GROUP) references qrtz_job_details (SCHED_NAME, JOB_NAME, JOB_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_blob_triggers
(
    SCHED_NAME    varchar(120) not null,
    TRIGGER_NAME  varchar(190) not null,
    TRIGGER_GROUP varchar(190) not null,
    BLOB_DATA     blob         null,
    primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
    constraint qrtz_blob_triggers_ibfk_1
        foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create index SCHED_NAME
    on qrtz_blob_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP);

create table if not exists qrtz_cron_triggers
(
    SCHED_NAME      varchar(120) not null,
    TRIGGER_NAME    varchar(190) not null,
    TRIGGER_GROUP   varchar(190) not null,
    CRON_EXPRESSION varchar(120) not null,
    TIME_ZONE_ID    varchar(80)  null,
    primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
    constraint qrtz_cron_triggers_ibfk_1
        foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_simple_triggers
(
    SCHED_NAME      varchar(120) not null,
    TRIGGER_NAME    varchar(190) not null,
    TRIGGER_GROUP   varchar(190) not null,
    REPEAT_COUNT    bigint(7)    not null,
    REPEAT_INTERVAL bigint(12)   not null,
    TIMES_TRIGGERED bigint(10)   not null,
    primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
    constraint qrtz_simple_triggers_ibfk_1
        foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create table if not exists qrtz_simprop_triggers
(
    SCHED_NAME    varchar(120)   not null,
    TRIGGER_NAME  varchar(190)   not null,
    TRIGGER_GROUP varchar(190)   not null,
    STR_PROP_1    varchar(512)   null,
    STR_PROP_2    varchar(512)   null,
    STR_PROP_3    varchar(512)   null,
    INT_PROP_1    int            null,
    INT_PROP_2    int            null,
    LONG_PROP_1   bigint         null,
    LONG_PROP_2   bigint         null,
    DEC_PROP_1    decimal(13, 4) null,
    DEC_PROP_2    decimal(13, 4) null,
    BOOL_PROP_1   varchar(1)     null,
    BOOL_PROP_2   varchar(1)     null,
    primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
    constraint qrtz_simprop_triggers_ibfk_1
        foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
    charset = latin1
    row_format = COMPACT;

create index IDX_QRTZ_T_C
    on qrtz_triggers (SCHED_NAME, CALENDAR_NAME);

create index IDX_QRTZ_T_G
    on qrtz_triggers (SCHED_NAME, TRIGGER_GROUP);

create index IDX_QRTZ_T_J
    on qrtz_triggers (SCHED_NAME, JOB_NAME, JOB_GROUP);

create index IDX_QRTZ_T_JG
    on qrtz_triggers (SCHED_NAME, JOB_GROUP);

create index IDX_QRTZ_T_NEXT_FIRE_TIME
    on qrtz_triggers (SCHED_NAME, NEXT_FIRE_TIME);

create index IDX_QRTZ_T_NFT_MISFIRE
    on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME);

create index IDX_QRTZ_T_NFT_ST
    on qrtz_triggers (SCHED_NAME, TRIGGER_STATE, NEXT_FIRE_TIME);

create index IDX_QRTZ_T_NFT_ST_MISFIRE
    on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_STATE);

create index IDX_QRTZ_T_NFT_ST_MISFIRE_GRP
    on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_GROUP, TRIGGER_STATE);

create index IDX_QRTZ_T_N_G_STATE
    on qrtz_triggers (SCHED_NAME, TRIGGER_GROUP, TRIGGER_STATE);

create index IDX_QRTZ_T_N_STATE
    on qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, TRIGGER_STATE);

create index IDX_QRTZ_T_STATE
    on qrtz_triggers (SCHED_NAME, TRIGGER_STATE);




everhopingandwaiting avatar Feb 02 '24 02:02 everhopingandwaiting

Hi, could you please share the DDL statements and YAML configuration files? code branch https://github.com/everhopingandwaiting/flink-cdc-connectors/tree/dev_3.x

everhopingandwaiting avatar Feb 02 '24 02:02 everhopingandwaiting

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 05:04 PatrickRen