kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
JsonFormat in S3 sink produces invalid JSON
The documentation states that
Use format.class=io.confluent.connect.s3.format.json.JsonFormat to write the S3 object as a single JSON array containing a JSON object for each record.
But what I observe in produced JSON files are just a newline-delimited list of objects.
My connector configuration is
{
"name": "etl-load-to-s3",
"topics": "testtopic",
"tasks.max": "1",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://ext-kafka-schema-registry.plat-02.svc.cluster.local:8081",
"flush.size": 64,
"s3.part.size": 5242880,
"rotate.schedule.interval.ms": "600000",
"partition.duration.ms": "600000",
"locale": "ru_RU",
"timezone": "UTC",
"timestamp.extractor": "Wallclock",
"schema.compatibility": "NONE",
"aws.secret.access.key": "...",
"aws.access.key.id": "...",
"s3.bucket.name": "...",
"s3.region": "us-east-1",
"store.url": "http://ext-minio:9000"
}
Am I doing something wrong or the documentation means something different?
Same problem for me . with the same format.class value. Would be happy to have some help 😄
+1
Similar issue here, except what I'm observing is that a Kafka value serialized as JSON:
{"foo": "bar"}
Ends up in S3 as a JSON string, rather than a JSON document:
"{\"foo\":\"bar\"}"
any update on this issue ?
Had a different config, but this ended up working:
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
Source: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
This is the expected behavior:
JSON: Use format.class=io.confluent.connect.s3.format.json.JsonFormat to write the S3 object as a file containing one JSON serialized record per line. The connector’s s3.compression.type configuration property can be set to none (the default) for no compression or gzip for GZip compression.
See: https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-formats
So you must use a JSON processor that treats newlines as records (jq
) or split the string on records delimited by \n
yourself