kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

JsonFormat in S3 sink produces invalid JSON

Open ostronom opened this issue 4 years ago • 6 comments

The documentation states that

Use format.class=io.confluent.connect.s3.format.json.JsonFormat to write the S3 object as a single JSON array containing a JSON object for each record.

But what I observe in produced JSON files are just a newline-delimited list of objects.

My connector configuration is

{
  "name": "etl-load-to-s3",
  "topics": "testtopic",
  "tasks.max": "1",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://ext-kafka-schema-registry.plat-02.svc.cluster.local:8081",
  "flush.size": 64,
  "s3.part.size": 5242880,
  "rotate.schedule.interval.ms": "600000",
  "partition.duration.ms": "600000",
  "locale": "ru_RU",
  "timezone": "UTC",
  "timestamp.extractor": "Wallclock",
  "schema.compatibility": "NONE",
  "aws.secret.access.key": "...",
  "aws.access.key.id": "...",
  "s3.bucket.name": "...",
  "s3.region": "us-east-1",
  "store.url": "http://ext-minio:9000"
}

Am I doing something wrong or the documentation means something different?

ostronom avatar May 25 '20 09:05 ostronom

Same problem for me . with the same format.class value. Would be happy to have some help 😄

fbecar22 avatar Aug 05 '20 15:08 fbecar22

+1

Lucas3oo avatar Apr 28 '22 14:04 Lucas3oo

Similar issue here, except what I'm observing is that a Kafka value serialized as JSON:

{"foo": "bar"}

Ends up in S3 as a JSON string, rather than a JSON document:

"{\"foo\":\"bar\"}"

charmon79 avatar Apr 29 '22 16:04 charmon79

any update on this issue ?

RenGeng avatar May 20 '22 14:05 RenGeng

Had a different config, but this ended up working:

    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"

Source: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

yoliho avatar Jul 02 '22 07:07 yoliho

This is the expected behavior:

JSON: Use format.class=io.confluent.connect.s3.format.json.JsonFormat to write the S3 object as a file containing one JSON serialized record per line. The connector’s s3.compression.type configuration property can be set to none (the default) for no compression or gzip for GZip compression.

See: https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-formats

So you must use a JSON processor that treats newlines as records (jq) or split the string on records delimited by \n yourself

GeoffWilliams avatar Aug 15 '23 01:08 GeoffWilliams