kafka-connect-bigquery
kafka-connect-bigquery copied to clipboard
BigQuery Exception, cannot write data to BigQuery using JSON Converter
Hi,
I am trying to write data from Kafka to BigQuery using Kafka Connect JSON Converter. But, I always got error. How to solve this issue?
Here is the configuration:
{
"name": "sink_bq_MyTable",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schema.registry.url": "http://shema-registry:8081",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"project": "my-bq-project-id",
"keyfile": "/home/ubuntu/key.json",
"datasets": ".*=my-bq-dataset",
"autoCreateTables":"false",
"autoUpdateSchemas":"false",
"topics": "cdc.dbo.MyTable",
"topicsToTables":"cdc.dbo.(.*)=$1"
}
}
Here is the exception:
java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertObject(BigQueryRecordConverter.java:99)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertStruct(BigQueryRecordConverter.java:129)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:73)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks in advance
What's the record of the object you're attempting to write? It looks as if the converter is attempting to convert a byte array but it's getting it in an unexpected format.
The first field is timestamp that using the bytes data type
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": false,
"field": "timestamp"
},
{
"type": "string",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "My Field"
},
...
...
...
Hi @mtagle,
I am trying to remove timestamp field (which data type is "bytes"). But, there is a new error message like below
2019-08-16 06:56:35,801] ERROR WorkerSinkTask{id=sink_bigquery_bankAccount-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:70)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Here is the data structure:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "No_"
},
{
"type": "string",
"optional": false,
"field": "Name"
},
{
"type": "string",
"optional": false,
"field": "__deleted"
}
],
"optional": false,
"name": "TestTable.Value"
},
"payload": {
"No_": "ABC123",
"Name": "Here is the name of the record",
"__deleted": "false"
}
}
Could you help me to solve this? Did anyone get the same issue? Could you tell me how to make it works?
Thanks in advance.
@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?
Hi,
I have a similar error writing to BigQuery
This is message sent that I can get it from console
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094 --from-beginning --topic md --property print.key=true
Note that it also prints kafka key
9485818a-e6c5-434d-9096-29c6e3f55148 {"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type": "string", "optional": true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": "9485818a-e6c5-434d-9096-29c6e3f55148"}}
The error thrown is
[2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Top-level Kafka Connect schema must be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
This is the the standalone properties file
bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
offset.flush.interval.ms=10000
and this is the sink properties file
name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60000
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null
Thanks
@michTalebzadeh Any luck with the solution?
Any update on this? Facing the same. When using org.apache.kafka.connect.storage.StringConverter
for key.converter. tagging @amithapa @michTalebzadeh since you guys were last commenting with similar issue
+1
+1
This is my current config with this everything works fine, just that key is not persisted in the big query.
CREATE SINK CONNECTOR `gcpbq52` WITH(
"connector.class"='com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
"keyfile"='/root/src/confluent/auth.json',
"project"='dev-project-123',
"defaultDataset"='dev_biqquery_poc',
"topics"='topicNew01Dev',
"timestampPartitionFieldName"='created_at',
"bigQueryPartitionDecorator"='false',
"timePartitioningType"='DAY',
"sanitizeTopics"='false',
"transforms"='tableChange',
"transforms.tableChange.type"='org.apache.kafka.connect.transforms.RegexRouter',
"transforms.tableChange.regex"='.+',
"transforms.tableChange.replacement"='table_name_here',
"queueSize"='5',
"auto.create.tables"='true',
"autoCreateTables"='true',
"key.converter"='org.apache.kafka.connect.storage.StringConverter',
"mergeIntervalMs"='5000',
"bigQueryRetry"='2'
);
when i add following config then the below exception triggers.
"kafkaKeyFieldName"='request_id',
Currently, for the key there is no schema present it is just a string.
any help is appreated here.
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.lambda$getRegularRow$2(SinkRecordConverter.java:135)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRegularRow(SinkRecordConverter.java:134)
at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:74)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:196)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:276)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)
+1
@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?
Yes, I use the io.confluent.connect.json.JsonSchemaConverter
to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.
Yes, I use the
io.confluent.connect.json.JsonSchemaConverter
to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.
@bmd-benitaclarissa May i ask whether you represent logical types in the JSON schema and how? For instance, how do you represent a Timestamp? or a JSON type in BQ