kafka-connect-bigquery icon indicating copy to clipboard operation
kafka-connect-bigquery copied to clipboard

BigQuery Exception, cannot write data to BigQuery using JSON Converter

Open bmd-benitaclarissa opened this issue 5 years ago • 13 comments

Hi,

I am trying to write data from Kafka to BigQuery using Kafka Connect JSON Converter. But, I always got error. How to solve this issue?

Here is the configuration:

{
    "name": "sink_bq_MyTable",
    "config": {
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://shema-registry:8081",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schemas.enable": "true",
	"transforms": "unwrap",  
	"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
	"project": "my-bq-project-id",
        "keyfile": "/home/ubuntu/key.json",
        "datasets": ".*=my-bq-dataset",
	"autoCreateTables":"false",
	"autoUpdateSchemas":"false",
        "topics": "cdc.dbo.MyTable",
        "topicsToTables":"cdc.dbo.(.*)=$1"
    }
}

Here is the exception:

java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertObject(BigQueryRecordConverter.java:99)
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertStruct(BigQueryRecordConverter.java:129)
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:73)
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Thanks in advance

bmd-benitaclarissa avatar Aug 07 '19 01:08 bmd-benitaclarissa

What's the record of the object you're attempting to write? It looks as if the converter is attempting to convert a byte array but it's getting it in an unexpected format.

mtagle avatar Aug 13 '19 18:08 mtagle

The first field is timestamp that using the bytes data type

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "bytes",
            "optional": false,
            "field": "timestamp"
          },
          {
            "type": "string",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "My Field"
          },
...
...
...

bmd-benitaclarissa avatar Aug 14 '19 01:08 bmd-benitaclarissa

Hi @mtagle,

I am trying to remove timestamp field (which data type is "bytes"). But, there is a new error message like below

2019-08-16 06:56:35,801] ERROR WorkerSinkTask{id=sink_bigquery_bankAccount-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:70)
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Here is the data structure:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "No_"
      },
      {
        "type": "string",
        "optional": false,
        "field": "Name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "TestTable.Value"
  },
  "payload": {
    "No_": "ABC123",
    "Name": "Here is the name of the record",
    "__deleted": "false"
  }
}

Could you help me to solve this? Did anyone get the same issue? Could you tell me how to make it works?

Thanks in advance.

bmd-benitaclarissa avatar Aug 16 '19 07:08 bmd-benitaclarissa

@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?

ricardodejuan avatar Oct 27 '19 15:10 ricardodejuan

Hi,

I have a similar error writing to BigQuery

This is message sent that I can get it from console

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094 --from-beginning --topic md --property print.key=true

Note that it also prints kafka key

9485818a-e6c5-434d-9096-29c6e3f55148    {"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type": "string", "optional": true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": "9485818a-e6c5-434d-9096-29c6e3f55148"}}

The error thrown is

[2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Top-level Kafka Connect schema must be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

This is the the standalone properties file

bootstrap.servers=rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
offset.flush.interval.ms=10000

and this is the sink properties file

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60000
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

Thanks

michTalebzadeh avatar Mar 17 '21 09:03 michTalebzadeh

@michTalebzadeh Any luck with the solution?

amithapa avatar Oct 06 '21 17:10 amithapa

Any update on this? Facing the same. When using org.apache.kafka.connect.storage.StringConverter for key.converter. tagging @amithapa @michTalebzadeh since you guys were last commenting with similar issue

aakarshg avatar Apr 01 '22 19:04 aakarshg

+1

ShahNewazKhan avatar May 29 '22 06:05 ShahNewazKhan

+1

shivakumarss avatar Jul 19 '22 06:07 shivakumarss

This is my current config with this everything works fine, just that key is not persisted in the big query.

CREATE SINK CONNECTOR `gcpbq52` WITH(
        "connector.class"='com.wepay.kafka.connect.bigquery.BigQuerySinkConnector',
        "keyfile"='/root/src/confluent/auth.json',
        "project"='dev-project-123',
        "defaultDataset"='dev_biqquery_poc',
        "topics"='topicNew01Dev',
        "timestampPartitionFieldName"='created_at',
        "bigQueryPartitionDecorator"='false',
        "timePartitioningType"='DAY',
        "sanitizeTopics"='false',
        "transforms"='tableChange',
        "transforms.tableChange.type"='org.apache.kafka.connect.transforms.RegexRouter',
        "transforms.tableChange.regex"='.+',
        "transforms.tableChange.replacement"='table_name_here',
        "queueSize"='5',
        "auto.create.tables"='true',
        "autoCreateTables"='true',
        "key.converter"='org.apache.kafka.connect.storage.StringConverter',
        "mergeIntervalMs"='5000',
        "bigQueryRetry"='2'
);

when i add following config then the below exception triggers.

"kafkaKeyFieldName"='request_id',

Currently, for the key there is no schema present it is just a string.

any help is appreated here.

com.wepay.kafka.connect.bigquery.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type 'struct'
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88)
	at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:49)
	at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.lambda$getRegularRow$2(SinkRecordConverter.java:135)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRegularRow(SinkRecordConverter.java:134)
	at com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter.getRecordRow(SinkRecordConverter.java:74)
	at com.wepay.kafka.connect.bigquery.write.batch.TableWriter$Builder.addRow(TableWriter.java:196)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:276)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)

shivakumarss avatar Jul 22 '22 10:07 shivakumarss

+1

Pangstar avatar Oct 19 '22 12:10 Pangstar

@bmd-benita Hi, could you write JSON data to BigQuery? or this library only works with Avro format?

Yes, I use the io.confluent.connect.json.JsonSchemaConverter to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.

bmd-benitaclarissa avatar Oct 21 '22 11:10 bmd-benitaclarissa

Yes, I use the io.confluent.connect.json.JsonSchemaConverter to write the data in JSON format from Kafka (CDC) to BigQuery. The connector is running in Kafka Connect v7.1.1 and it works properly now.

@bmd-benitaclarissa May i ask whether you represent logical types in the JSON schema and how? For instance, how do you represent a Timestamp? or a JSON type in BQ

slavab89 avatar Nov 09 '23 08:11 slavab89