kafka-connect-bigquery
kafka-connect-bigquery copied to clipboard
Resource usage limits via configuration - Tasks crashing due to high load
I'm trying to sink some pretty large topics from Kafka (5 topics with about 250 million events each) into BigQuery via a separate (rather large - 8CPU, 32Gb RAM X3) Kafka Connect cluster. It starts up fine but after about 2 minutes, the connect instance CPUs are pegged at 100%, and the nodes start disconnecting - ultimately the whole process restarts with little progress on getting any data into BigQuery.
I tried that configuration in a replica of our environment with many less events (500,000) and it works fine.
Are there any configurations that can throttle the processing of events to keep the CPU in check? I tried tuning queueSize
and threadPoolSize
, as well as max.queue.size and max.batch.size to no avail.
Any hint/help would be very much appreciated!
Here's our config for reference:
{
"name": "hd-sink-bq",
"tasks.max": "3",
"queueSize": 20000,
"threadPoolSize": 2,
"topics": "topic1,topic2,topic3,topic4,topic5",
"sanitizeTopics": "true",
"autoCreateTables": "true",
"timestampPartitionFieldName": "created_at",
"max.queue.size": "81290",
"max.batch.size": "20480",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<REGISTRY_URL>",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<REGISTRY_URL>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"defaultDataset": "data_lake",
"allowNewBigQueryFields": "true",
"bigQueryPartitionDecorator": "false",
"project": "<PROJECT>",
"keySource": "JSON",
"keyfile": "<JSON_STRINGIFIED_KEY>",
"timePartitioningType": "DAY",
"upsertEnabled": true,
"kafkaKeyFieldName": "_kid",
"transforms": "removeEventRequestData,removeResponseData",
"transforms.removeEventRequestData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeEventRequestData.blacklist": "headers,body,path,query",
"transforms.removeEventRequestData.predicate": "isEventRequest",
"transforms.removeResponseData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeResponseData.blacklist": "body",
"transforms.removeResponseData.predicate": "isAttemptResponse",
"predicates": "isEventRequest,isAttemptResponse",
"predicates.isEventRequest.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isEventRequest.pattern": "topic1",
"predicates.isAttemptResponse.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAttemptResponse.pattern": "topic2",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "connect.bq-sink.deadletter"
}