kafka-tutorials
kafka-tutorials copied to clipboard
DLQ aggregation recipe
Use case: aggregate DLQ topics w/ ksqlDB and send downstream (e.g., to BigQuery)
Recipe would, e.g.:
- create 2 connectors w/ DLQ topics, simulate the DLQ topics via
INSERT
- KSQL to aggregate (transform / normalize?) the streams into one
- send downstream
- Create a stream against a DLQ topic
CREATE STREAM dlq_bigquery_1 ( key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value BYTES>> HEADERS ) WITH ( WRAP_SINGLE_VALUE=false, kafka_topic = 'dlq-lcc-gqx2qn', value_format = 'json' );
- Create another stream to hold DLQ messages from many other DLQ topics:
CREATE STREAM all_dlq (key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value STRING>>) WITH (kafka_topic='all_dlq', partitions=1, value_format='json');
- Insert each DLQ topics to the all_dlq stream:
INSERT INTO all_dlq SELECT key_col, value_col, TRANSFORM(err_msg, x => Struct(key := x->key, value := FROM_BYTES(x->value, 'base64'))) as err_msg FROM dlq_bigquery_1;
- Sink to bigquery
CREATE SINK CONNECTOR ...
A sample DLQ input record produced by the S3 sink connector:
Value of the record:
{ "ordertime": 1497014222380, "orderid": 18, "itemid": "Item_184", "address": { "city": "Mountain View", "state": "CA", "zipcode": 94041 } }
Header of the record:
[ { "key": "__connect.errors.topic", "stringValue": "pksqlc-okr9jACCOMPLISHED_FEMALE_READERS" }, { "key": "__connect.errors.partition", "stringValue": "0" }, { "key": "__connect.errors.offset", "stringValue": "4957217" }, { "key": "__connect.errors.connector.name", "stringValue": "lcc-gqx2qn" }, { "key": "__connect.errors.task.id", "stringValue": "0" }, { "key": "__connect.errors.stage", "stringValue": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "stringValue": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "stringValue": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "stringValue": "Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: " }, { "key": "__connect.errors.exception.stacktrace", "stringValue": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:176)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:231)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:111)\n\t... 20 more\n" } ]
Key of the record:
18