kafka-connect-bigquery
kafka-connect-bigquery copied to clipboard
Allow Avro type with no fields
We have one record field that can be of different types depending of the what the message is about. All of they are mapped correctly to BigQuery except this one:
{
"type": "record",
"name": "NothingChanged",
"fields": []
}
which results in this error:
java.lang.IllegalArgumentException: The RECORD field must have at least one sub-field
at com.google.cloud.bigquery.Field$Builder.setType(Field.java:149)
at com.google.cloud.bigquery.Field.newBuilder(Field.java:285)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertStruct(BigQuerySchemaConverter.java:175)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertField(BigQuerySchemaConverter.java:127)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertStruct(BigQuerySchemaConverter.java:169)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertField(BigQuerySchemaConverter.java:127)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:109)
at com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverter.convertSchema(BigQuerySchemaConverter.java:43)
at com.wepay.kafka.connect.bigquery.SchemaManager.constructTableInfo(SchemaManager.java:68)
at com.wepay.kafka.connect.bigquery.SchemaManager.createTable(SchemaManager.java:49)
at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensureExistingTables(BigQuerySinkConnector.java:117)
at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensureExistingTables(BigQuerySinkConnector.java:140)
at com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.start(BigQuerySinkConnector.java:159)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Could record types with 0 fields simply be ignored while generating the schema for BigQuery?
We have a similar situation with a struct without fields. It seems version 1.6.5 (with some extra commits) of the connector is actually ignoring this field in BigQuery...