[Bug] [StarRocks] [cdc master branch] Unsupported CDC data type BYTES/VARBINARY
Search before asking
- [X] I searched in the issues and found nothing similar.
Flink version
Flink 1.18
Flink CDC version
3.0.1 master
Database and its version
mysql 5.7.44 StarRocks 3.2.2
Minimal reproduce step
To synchronize the entire database using regular expressions, tables: db..*,
2024-01-29 16:32:18
java.lang.UnsupportedOperationException: Unsupported CDC data type BYTES
at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:368)
at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils$CdcDataTypeTransformer.defaultMethod(StarRocksUtils.java:236)
at com.ververica.cdc.common.types.DataTypeDefaultVisitor.visit(DataTypeDefaultVisitor.java:49)
at com.ververica.cdc.common.types.VarBinaryType.accept(VarBinaryType.java:86)
at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType(StarRocksUtils.java:112)
at com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksTable(StarRocksUtils.java:86)
at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyCreateTable(StarRocksMetadataApplier.java:87)
at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:70)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149)
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204)
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121)
at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1062)
at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:604)
at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
What did you expect to see?
the data synchronization process to proceed smoothly.
What did you see instead?
The data synchronization process has been interrupted and cannot continue.
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
alse : java.lang.UnsupportedOperationException: Unsupported CDC data type
VARBINARY
Can you offer your create-table sql?
These are all the error messages. I compiled the latest branch of Flink CDC 3.0.x code and used a YAML script to synchronize the entire database. The syntax for synchronizing tables is tables: db.[^qrtz]..* The problem may be due to the existence of fields with the data types longblob or blob in other tables.
Could you please share the MySQL and StarRocks 'CREATE TABLE' DDL statements, as well as the associated YAML configuration files? I would like to replicate the issue in my local environment
Could you please share the MySQL and StarRocks 'CREATE TABLE' DDL statements, as well as the associated YAML configuration files? I would like to replicate the issue in my local environment
I will provide the table structures of database and the contents of the YAML configuration file later
Hi, could you please share the DDL statements and YAML configuration files?
Hi, could you please share the DDL statements and YAML configuration files?
cdc script config:
source:
type: mysql
hostname: 1x
port: 3306
username: xxx
password: xxx
server-id: 45354-49440
#tables: db.t\.*,db.sys\.*,db.code\.*,
tables: db.\.*
server-time-zone: Asia/Shanghai
ignore.noprimarykey.table: true
enable.column.comments: true
#scan.startup.mode: specific-offset
scan.startup.specific-offset.file: master-bin.000087
scan.startup.specific-offset.pos: 1
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://xxx:9030
load-url: xxx:8030
username: xx
password: xxx
table.create.properties.replication_num: 1
table.create.properties.light_schema_change: true
table.create.properties.fast_schema_evolution: true
sink.connect.timeout-ms: 3000009
pipeline:
name: Sync MySQL Database to StarRocks 20 jyao enable.column.comments ignore.noprimarykey.table ignore yjxx qrtz logbin 87 fixed2908
parallelism: 2
enable-schema-evolution: true
route:
- source-table: db.[\S]*
sink-table: db_.s<>
replace-symbol: s<>
- source-table: db-test.[\S]*
sink-table: db_.s<>
replace-symbol: s<>
create table if not exists qrtz_calendars
(
SCHED_NAME varchar(120) not null,
CALENDAR_NAME varchar(190) not null,
CALENDAR blob not null,
primary key (SCHED_NAME, CALENDAR_NAME)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_fired_triggers
(
SCHED_NAME varchar(120) not null,
ENTRY_ID varchar(95) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
INSTANCE_NAME varchar(190) not null,
FIRED_TIME bigint(13) not null,
SCHED_TIME bigint(13) not null,
PRIORITY int not null,
STATE varchar(16) not null,
JOB_NAME varchar(190) null,
JOB_GROUP varchar(190) null,
IS_NONCONCURRENT varchar(1) null,
REQUESTS_RECOVERY varchar(1) null,
primary key (SCHED_NAME, ENTRY_ID)
)
charset = latin1
row_format = COMPACT;
create index IDX_QRTZ_FT_INST_JOB_REQ_RCVRY
on qrtz_fired_triggers (SCHED_NAME, INSTANCE_NAME, REQUESTS_RECOVERY);
create index IDX_QRTZ_FT_JG
on qrtz_fired_triggers (SCHED_NAME, JOB_GROUP);
create index IDX_QRTZ_FT_J_G
on qrtz_fired_triggers (SCHED_NAME, JOB_NAME, JOB_GROUP);
create index IDX_QRTZ_FT_TG
on qrtz_fired_triggers (SCHED_NAME, TRIGGER_GROUP);
create index IDX_QRTZ_FT_TRIG_INST_NAME
on qrtz_fired_triggers (SCHED_NAME, INSTANCE_NAME);
create index IDX_QRTZ_FT_T_G
on qrtz_fired_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP);
create table if not exists qrtz_job_details
(
SCHED_NAME varchar(120) not null,
JOB_NAME varchar(190) not null,
JOB_GROUP varchar(190) not null,
DESCRIPTION varchar(250) null,
JOB_CLASS_NAME varchar(250) not null,
IS_DURABLE varchar(1) not null,
IS_NONCONCURRENT varchar(1) not null,
IS_UPDATE_DATA varchar(1) not null,
REQUESTS_RECOVERY varchar(1) not null,
JOB_DATA blob null,
primary key (SCHED_NAME, JOB_NAME, JOB_GROUP)
)
charset = latin1
row_format = COMPACT;
create index IDX_QRTZ_J_GRP
on qrtz_job_details (SCHED_NAME, JOB_GROUP);
create index IDX_QRTZ_J_REQ_RECOVERY
on qrtz_job_details (SCHED_NAME, REQUESTS_RECOVERY);
create table if not exists qrtz_locks
(
SCHED_NAME varchar(120) not null,
LOCK_NAME varchar(40) not null,
primary key (SCHED_NAME, LOCK_NAME)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_paused_trigger_grps
(
SCHED_NAME varchar(120) not null,
TRIGGER_GROUP varchar(190) not null,
primary key (SCHED_NAME, TRIGGER_GROUP)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_scheduler_state
(
SCHED_NAME varchar(120) not null,
INSTANCE_NAME varchar(190) not null,
LAST_CHECKIN_TIME bigint(13) not null,
CHECKIN_INTERVAL bigint(13) not null,
primary key (SCHED_NAME, INSTANCE_NAME)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_triggers
(
SCHED_NAME varchar(120) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
JOB_NAME varchar(190) not null,
JOB_GROUP varchar(190) not null,
DESCRIPTION varchar(250) null,
NEXT_FIRE_TIME bigint(13) null,
PREV_FIRE_TIME bigint(13) null,
PRIORITY int null,
TRIGGER_STATE varchar(16) not null,
TRIGGER_TYPE varchar(8) not null,
START_TIME bigint(13) not null,
END_TIME bigint(13) null,
CALENDAR_NAME varchar(190) null,
MISFIRE_INSTR smallint(2) null,
JOB_DATA blob null,
primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
constraint qrtz_triggers_ibfk_1
foreign key (SCHED_NAME, JOB_NAME, JOB_GROUP) references qrtz_job_details (SCHED_NAME, JOB_NAME, JOB_GROUP)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_blob_triggers
(
SCHED_NAME varchar(120) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
BLOB_DATA blob null,
primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
constraint qrtz_blob_triggers_ibfk_1
foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
charset = latin1
row_format = COMPACT;
create index SCHED_NAME
on qrtz_blob_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP);
create table if not exists qrtz_cron_triggers
(
SCHED_NAME varchar(120) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
CRON_EXPRESSION varchar(120) not null,
TIME_ZONE_ID varchar(80) null,
primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
constraint qrtz_cron_triggers_ibfk_1
foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_simple_triggers
(
SCHED_NAME varchar(120) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
REPEAT_COUNT bigint(7) not null,
REPEAT_INTERVAL bigint(12) not null,
TIMES_TRIGGERED bigint(10) not null,
primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
constraint qrtz_simple_triggers_ibfk_1
foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
charset = latin1
row_format = COMPACT;
create table if not exists qrtz_simprop_triggers
(
SCHED_NAME varchar(120) not null,
TRIGGER_NAME varchar(190) not null,
TRIGGER_GROUP varchar(190) not null,
STR_PROP_1 varchar(512) null,
STR_PROP_2 varchar(512) null,
STR_PROP_3 varchar(512) null,
INT_PROP_1 int null,
INT_PROP_2 int null,
LONG_PROP_1 bigint null,
LONG_PROP_2 bigint null,
DEC_PROP_1 decimal(13, 4) null,
DEC_PROP_2 decimal(13, 4) null,
BOOL_PROP_1 varchar(1) null,
BOOL_PROP_2 varchar(1) null,
primary key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
constraint qrtz_simprop_triggers_ibfk_1
foreign key (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) references qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
)
charset = latin1
row_format = COMPACT;
create index IDX_QRTZ_T_C
on qrtz_triggers (SCHED_NAME, CALENDAR_NAME);
create index IDX_QRTZ_T_G
on qrtz_triggers (SCHED_NAME, TRIGGER_GROUP);
create index IDX_QRTZ_T_J
on qrtz_triggers (SCHED_NAME, JOB_NAME, JOB_GROUP);
create index IDX_QRTZ_T_JG
on qrtz_triggers (SCHED_NAME, JOB_GROUP);
create index IDX_QRTZ_T_NEXT_FIRE_TIME
on qrtz_triggers (SCHED_NAME, NEXT_FIRE_TIME);
create index IDX_QRTZ_T_NFT_MISFIRE
on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME);
create index IDX_QRTZ_T_NFT_ST
on qrtz_triggers (SCHED_NAME, TRIGGER_STATE, NEXT_FIRE_TIME);
create index IDX_QRTZ_T_NFT_ST_MISFIRE
on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_STATE);
create index IDX_QRTZ_T_NFT_ST_MISFIRE_GRP
on qrtz_triggers (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_GROUP, TRIGGER_STATE);
create index IDX_QRTZ_T_N_G_STATE
on qrtz_triggers (SCHED_NAME, TRIGGER_GROUP, TRIGGER_STATE);
create index IDX_QRTZ_T_N_STATE
on qrtz_triggers (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, TRIGGER_STATE);
create index IDX_QRTZ_T_STATE
on qrtz_triggers (SCHED_NAME, TRIGGER_STATE);
Hi, could you please share the DDL statements and YAML configuration files? code branch https://github.com/everhopingandwaiting/flink-cdc-connectors/tree/dev_3.x
Closing this issue as it has been migrated to Apache Jira.