gcs-connector-for-apache-kafka icon indicating copy to clipboard operation
gcs-connector-for-apache-kafka copied to clipboard

Heap OOM issue when trying to sing a Debezium snapshot to GCS

Open mkherlakian opened this issue 2 years ago • 2 comments

I'm running into an issue where the connector's JVM exists with an OOM error when sinking a topic freshly snapshotted through a PG database through Debezium. The setup:

  • 2 topics, topic A with 500k messages, topic B with 200
  • Avro format for key and value
  • Source connector produces messages with no issues, messages make it in Kafka
  • Topic A message size is similar to Topic B - Total size of topic A is 214kb, total size of topic B is 180Mb
  • template file config is "file.name.template": "{{topic}}/{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}-{{timestamp:unit=HH}}.parquet.gz"
  • output format is parquet

This is all running on Aiven.

Topic A successfully sinks into GCS. The parquet file gets uploaded and all the data that we expect is there. Topic B consistently runs OOM.

We've tried a variety of values for file.max.records ranging from 50 to 1000, and for offset.flush.interval.ms, lowest being 50ms, but we still experience the OOMs

Part of the issue we believe is coming from the fact that since this starts with a PG snapshot, the timestamps are all within an hour of each other for the 1M records already in the topic. Therefore the connector's grouping logic would consider the entire topic's content to be part of 1 group - and if the GCS connector behaves the same as the S3 one, we thought this could be an indication - https://help.aiven.io/en/articles/4775651-kafka-outofmemoryerror-exceptions. However, we would've expected the file.max.records to compensate for this.

Also while ugrading plans is an option, we'd like to understand what knobs to turn to control memory utilization. Full cleaned up config attached:

{
  "name": "gcs",
  "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
  "tasks.max": "1",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://<karapace-service>.aivencloud.com:10034",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://<karapace>.aivencloud.com:10034",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "topics": "pg.public.A,pg.public.B",

  "gcs.credentials.json": "<GCP_CREDENTIALS>",
  "gcs.bucket.name": "data-lake",
  "file.name.template": "{{topic}}/{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}-{{timestamp:unit=HH}}.parquet.gz",
  
  "file.compression.type": "gzip",
  "file.max.records": "200",
  "format.output.type": "parquet",
  "format.output.fields": "key,offset,timestamp,value",

  "offset.flush.interval.ms": 50, //tried different values here, none seem to have an effect
}

Any insight into what might be happening?

mkherlakian avatar Aug 02 '22 17:08 mkherlakian