kafka-connect-bigquery icon indicating copy to clipboard operation
kafka-connect-bigquery copied to clipboard

Timestamp Partition For Date Outside Range

Open archy-bold opened this issue 4 years ago • 0 comments

I've come across an issue with the timestamp partition where if the timestamp given is more than 1 year before today or 6 months in the future, it gets rejected and causes the connector to get stuck trying to send the batches over and over.

Here's my configuration:

{
  "name": "bigquery-streams",
  "config": {
    "name": "bigquery-streams",
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max": "1",

    "topics": "STREAMS_IDS",
    "sanitizeTopics": "true",

    "autoUpdateSchemas": false,
    "bigQueryPartitionDecorator": false,
    "timestampPartitionFieldName": "TIMESTAMP",

    "project": "xx",
    "datasets": ".*=xx",
    "keyfile": "/keyfile.json",
    "keySource": "FILE",

    "schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
    "schemaRegistryLocation": "http://schema-registry:8081",

    "transforms": "ConvertTimestamp",
    "transforms.RemoveFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RemoveFields.blacklist": "ID",
    "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimestamp.timestamp.field": "TIMESTAMP",
    "transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.ConvertTimestamp.field": "TIMESTAMP",
    "transforms.ConvertTimestamp.target.type": "Timestamp"
  }
}

Here's a sample of some of the errors:

connect | [2020-05-14 15:33:47,697] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: table insertion failed for the following rows: connect | [row index 0]: invalid: Value 1624291503000000 for field timestamp of the destination table spatial-encoder-259717:wata_streams_test.STREAMS_IDS is outside the allowed bounds. You can only stream to date range within 365 days in the past and 183 days in the future relative to the current date. connect | [row index 1]: stopped: connect | [row index 2]: stopped: connect | [row index 3]: stopped: connect | [row index 4]: stopped: connect | [row index 5]: stopped: connect | [row index 6]: stopped: connect | [row index 7]: stopped: connect | [row index 8]: stopped: connect | [row index 9]: stopped: connect | [row index 10]: stopped: connect | [row index 11]: stopped: connect | [row index 12]: stopped:
connect | [row index 13]: stopped: connect | [row index 14]: stopped: connect | [row index 15]: stopped: connect | [row index 16]: stopped: connect | [row index 17]: stopped: connect | [row index 18]: stopped: connect | [row index 499]: stopped: (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor) connect | Exception in thread "pool-37-thread-72" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows: connect | [row index 0]: invalid: Value 1624291503000000 for field timestamp of the destination table spatial-encoder-259717:wata_streams_test.STREAMS_IDS is outside the allowed bounds. You can only stream to date range within 365 days in the past and 183 days in the future relative to the current date. connect | [row index 1]: stopped: connect | [row index 2]: stopped: connect | [row index 3]: stopped: connect | [row index 4]: stopped: connect | [row index 5]: stopped: connect | [row index 6]: stopped: connect | [row index 7]: stopped: connect | [row index 8]: stopped: connect | [row index 9]: stopped: connect | [row index 10]: stopped: connect | [row index 11]: stopped: connect | [row index 12]: stopped: connect | [row index 499]: stopped:
connect | at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
connect | at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
connect | at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | [2020-05-14 15:33:46,031] ERROR WorkerSinkTask{id=bigquery-streams-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) connect | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException,com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
connect | at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97) connect | at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:127) connect | at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748) connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 311 for partition STREAMS_IDS-2 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 221 for partition STREAMS_IDS-3 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 266 for partition STREAMS_IDS-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] INFO [Consumer clientId=connector-consumer-bigquery-streams-0, groupId=connect-bigquery-streams] Seeking to offset 316 for partition STREAMS_IDS-1 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect | [2020-05-14 15:33:46,032] ERROR WorkerSinkTask{id=bigquery-streams-0} Commit of offsets threw an unexpected exception for sequence number 8: null (org.apache.kafka.connect.runtime.WorkerSinkTask) connect | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQu$ryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException,com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
connect | at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97) connect | at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:127) connect | at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208) connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266) connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) connect | at java.lang.Thread.run(Thread.java:748)

archy-bold avatar May 14 '20 15:05 archy-bold