kafka-tutorials icon indicating copy to clipboard operation
kafka-tutorials copied to clipboard

DLQ aggregation recipe

Open davetroiano opened this issue 1 year ago • 2 comments

Use case: aggregate DLQ topics w/ ksqlDB and send downstream (e.g., to BigQuery)

Recipe would, e.g.:

  1. create 2 connectors w/ DLQ topics, simulate the DLQ topics via INSERT
  2. KSQL to aggregate (transform / normalize?) the streams into one
  3. send downstream

davetroiano avatar Sep 08 '22 16:09 davetroiano

  1. Create a stream against a DLQ topic
    CREATE STREAM dlq_bigquery_1 (
    key_col VARCHAR KEY,
    value_col VARCHAR,
    err_msg ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
    ) WITH (
    WRAP_SINGLE_VALUE=false,
    kafka_topic = 'dlq-lcc-gqx2qn',
    value_format = 'json'
    );
    
  2. Create another stream to hold DLQ messages from many other DLQ topics:
    CREATE STREAM all_dlq (key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value STRING>>)
        WITH (kafka_topic='all_dlq', partitions=1, value_format='json');
    
  3. Insert each DLQ topics to the all_dlq stream:
    INSERT INTO all_dlq SELECT key_col, value_col, TRANSFORM(err_msg, x => Struct(key := x->key, value := FROM_BYTES(x->value, 'base64'))) as err_msg FROM dlq_bigquery_1;
    
  4. Sink to bigquery
    CREATE SINK CONNECTOR ...
    

chuck-confluent avatar Sep 12 '22 17:09 chuck-confluent

A sample DLQ input record produced by the S3 sink connector:

Value of the record: { "ordertime": 1497014222380, "orderid": 18, "itemid": "Item_184", "address": { "city": "Mountain View", "state": "CA", "zipcode": 94041 } }

Header of the record: [ { "key": "__connect.errors.topic", "stringValue": "pksqlc-okr9jACCOMPLISHED_FEMALE_READERS" }, { "key": "__connect.errors.partition", "stringValue": "0" }, { "key": "__connect.errors.offset", "stringValue": "4957217" }, { "key": "__connect.errors.connector.name", "stringValue": "lcc-gqx2qn" }, { "key": "__connect.errors.task.id", "stringValue": "0" }, { "key": "__connect.errors.stage", "stringValue": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "stringValue": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "stringValue": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "stringValue": "Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: " }, { "key": "__connect.errors.exception.stacktrace", "stringValue": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:176)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:231)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:111)\n\t... 20 more\n" } ]

Key of the record: 18

hendrasutanto avatar Sep 13 '22 16:09 hendrasutanto