kcat icon indicating copy to clipboard operation
kcat copied to clipboard

Draft: Producer json & base64 support & better test cases

Open JakkuSakura opened this issue 3 years ago • 15 comments

Now option -J works in producer mode. -B k for keys, -B v for payloads, -B a for both key and payloads to to be encoded/decoded in base64. base64 works in both stdin and json.

I wrote this producer json and base64 support for copying binary topics. There is no lisencing issue, since the new library base64 is in public domain(and I have modified it a bit).

There are still some issues, though. I can't figure out the memory management of this project, so there may be some memery leakage.

I also wrote 2 test cases and beautified other old 2 test cases. I removed the assumption of minimal default partition number of 3.

JakkuSakura avatar Feb 21 '21 06:02 JakkuSakura

This would be really helpful to copy messages between topics or brokers! Nice work!

Another feature that would be amazingly helpful in this scenario would be to preserve headers. -J seems to output these under a headers property in the output JSON. If these could be preserved (or if there were a command line option that would select out certain headers to preserve), then we could transparently copy data between brokers or topics.

WDYT?

masoncj avatar Mar 02 '21 20:03 masoncj

It's a good idea to be able to keep the headers. I'll have a look at it when I have time. (Haven't used headers before)

JakkuSakura avatar Mar 03 '21 04:03 JakkuSakura

Hi @qiujiangkun

I've implemented the header production from JSON over in this branch: https://github.com/masoncj/kafkacat/tree/producer_json_headers

However, I'm seeing an issue with this PR (absent any of my code above) where the produced data seems to be corrupted:

cat > in_cmason.json <<END
{"topic":"content","partition":0,"offset":0,"tstype":"create","ts":1602611620140,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","7a7d5892dad76b89","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"18278fa2-d7c1-4bdf-9f47-aaa657ccc10e\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : null,\n  \"measureType\" : null,\n  \"units\" : null,\n  \"choices\" : null,\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464101Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":1,"tstype":"create","ts":1602611624213,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","49e4f775786d9807","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"43ca5fe2-3cde-4cff-a271-a88695d78196\",\n  \"id\" : \"e67afd11-07cf-463a-a18d-226f309dee60\",\n  \"name\" : \"How was your day?\",\n  \"description\" : \"How was your day?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"4c985675-ff68-456b-ba81-0ee81c87c240\",\n    \"value\" : \"Bad\"\n  }, {\n    \"id\" : \"b55fbb21-5ee7-4283-a15f-00cc0c0ec366\",\n    \"value\" : \"OK\"\n  }, {\n    \"id\" : \"7a6a1407-1bc5-4475-bfe2-a75fef8c342a\",\n    \"value\" : \"Good\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464689Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":2,"tstype":"create","ts":1602611624390,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","532fcb40e28072e3","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"df987555-f991-4340-b391-cd0e37424baa\",\n  \"id\" : \"cb94d22c-1701-4ec5-998a-1a882863509c\",\n  \"name\" : \"What was your physical activity level today?\",\n  \"description\" : \"What was your physical activity level today?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"30c354b9-a7f3-4b27-85d8-31d1494605b9\",\n    \"value\" : \"Inactive\"\n  }, {\n    \"id\" : \"12035e8c-f011-4030-8d07-3d6053f300c6\",\n    \"value\" : \"Light\"\n  }, {\n    \"id\" : \"6d955cc7-c056-4520-b4ed-2d59e38abe34\",\n    \"value\" : \"Moderate\"\n  }, {\n    \"id\" : \"bde446eb-5cd0-4677-ba1d-ec3ff75c4406\",\n    \"value\" : \"Vigorous\"\n  }, {\n    \"id\" : \"dc4d44ef-1aae-477e-9a3d-9e4b44be5759\",\n    \"value\" : \"Intense\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.464734Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":3,"tstype":"create","ts":1602611624518,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","57230c6ddc10789b","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"ec3363b5-34d2-4dae-9327-7a1b9923ba32\",\n  \"id\" : \"40bb07dd-fa16-4c92-a898-df3a2b52faa8\",\n  \"name\" : \"About how many hours did you sleep last night?\",\n  \"description\" : \"About how many hours did you sleep last night?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"NUMERIC\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : {\n    \"minValue\" : 0.0,\n    \"maxValue\" : 12.0\n  },\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537644Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":4,"tstype":"create","ts":1602611624644,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","6cc095cfedb4e4d9","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"cc89283a-e034-4ba4-a502-4c08d3aa9b01\",\n  \"id\" : \"88e240e9-d343-4c3d-94f5-0733827330e4\",\n  \"name\" : \"In the last 24 hours, did you take all of your medication exactly as your healthcare provider recommended?\",\n  \"description\" : \"In the last 24 hours, did you take all of your medication exactly as your healthcare provider recommended?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"495b1dc0-ef24-4477-a373-e33d28b8bcf5\",\n    \"value\" : \"I took all of them as recommended\"\n  }, {\n    \"id\" : \"19f8e435-14a4-4401-8558-d9460cad5546\",\n    \"value\" : \"I missed some of them\"\n  }, {\n    \"id\" : \"6c0fdaf2-f9d6-44c9-9f9c-538a7c194394\",\n    \"value\" : \"I took them at a different time\"\n  }, {\n    \"id\" : \"09e74b51-bcd1-4f61-9737-55b17b571005\",\n    \"value\" : \"I forgot to take them\"\n  }, {\n    \"id\" : \"f58cdd13-5d80-4c1e-bcdf-94b2fea7454e\",\n    \"value\" : \"I didn't because I felt better\"\n  }, {\n    \"id\" : \"c6d26769-202b-4a69-aa1e-ea93228287ae\",\n    \"value\" : \"I didn't because they make me feel worse\"\n  }, {\n    \"id\" : \"d96de9a5-9c00-42fc-ab98-41fff829f663\",\n    \"value\" : \"I didn't for a different reason\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537674Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":5,"tstype":"create","ts":1602611624719,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","3315c1ff34b847b5","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"c3ce5bfa-cae0-4e6f-b9ed-0a8894707ec0\",\n  \"id\" : \"a5c3dad6-a3a2-4b09-b762-d834e40189ed\",\n  \"name\" : \"Comments\",\n  \"description\" : \"Comments\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"88e240e9-d343-4c3d-94f5-0733827330e4\",\n  \"measureType\" : \"TEXT\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537683Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":6,"tstype":"create","ts":1602611624773,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","1335e863fc3de15","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"1331848f-7655-40e0-ab0f-0b88305dd33d\",\n  \"id\" : \"15b044cf-1eec-43ac-b35d-ffa1ab9ac197\",\n  \"name\" : \"Any thoughts to record?\",\n  \"description\" : \"Any thoughts to record?\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"measureType\" : \"ENUMERATED\",\n  \"units\" : null,\n  \"choices\" : [ {\n    \"id\" : \"fc584cf6-8ec8-45d8-8ae0-577073c5bb6e\",\n    \"value\" : \"Not today\"\n  } ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537702Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":7,"tstype":"create","ts":1602611624857,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-pollster-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","2013a2d6e02fa87f","X-B3-SpanId","42bbb93e06c4244d","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"423752e8-34ac-45e0-8ddc-b27920e98244\",\n  \"id\" : \"fbda8ebc-d417-432d-b7ad-bff7c68c61b1\",\n  \"name\" : \"My notes for today:\",\n  \"description\" : \"My notes for today:\",\n  \"contentDocIds\" : [ ],\n  \"contentMediaIds\" : [ ],\n  \"parentMeasureId\" : \"15b044cf-1eec-43ac-b35d-ffa1ab9ac197\",\n  \"measureType\" : \"TEXT\",\n  \"units\" : null,\n  \"choices\" : [ ],\n  \"validation\" : null,\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"createdAt\" : \"2020-10-13T17:53:38.537711Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"EXTERNAL\",\n  \"eventType\" : \"MeasureUpdated\"\n}"}
{"topic":"content","partition":0,"offset":8,"tstype":"create","ts":1602611624970,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-provider-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","ef85f342d5c07d64","X-B3-SpanId","35830a429a65d44b","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"a5650bba-0db1-4196-af05-036688625237\",\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"eventType\" : \"MeasureUpdated\",\n  \"createdAt\" : \"2020-10-13T17:53:44.918700Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"ENTITY\",\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"timeDescription\" : null,\n  \"version\" : 0\n}"}
{"topic":"content","partition":0,"offset":9,"tstype":"create","ts":1602611625143,"broker":-1,"headers":["kafka_correlationId","9df423b7-811d-4cd5-a8c4-b02600a414e7","CMason_Wire_Format","JSON","CMason_Sending_App","apollo-patient-app","CMason_Event_Type","MeasureUpdated","CMason_Event_Stream_Version","1","X-B3-TraceId","45a3bb3569ad77ba","X-B3-SpanId","6bde2c3399974fda","X-B3-Sampled","1"],"key":"1f92adc2-c355-429b-93ef-c06ca235423a","payload":"{\n  \"eventType\" : \"MeasureUpdated\",\n  \"eventId\" : \"5c4f3da0-410c-437d-9110-c935e7ffa8e5\",\n  \"correlationId\" : \"9df423b7-811d-4cd5-a8c4-b02600a414e7\",\n  \"id\" : \"7c96f915-9b4d-334a-8c33-edf0d1d991ba\",\n  \"eventType\" : \"MeasureUpdated\",\n  \"createdAt\" : \"2020-10-13T17:53:45.062533Z\",\n  \"createdBy\" : \"b56fb618-a654-3d71-86f1-73a4bf5b1538\",\n  \"source\" : \"ENTITY\",\n  \"parentMeasureId\" : null,\n  \"contentDocs\" : [ ],\n  \"name\" : \"Wellbeing Check-in\",\n  \"description\" : \"Survey: Wellbeing Check-in (ID: 292368082 version: 2a24f6c2-7ba6-41f8-95ee-0fd7d90896f5)\",\n  \"contentMedia\" : [ ],\n  \"units\" : null,\n  \"choices\" : null,\n  \"version\" : 0,\n  \"validation\" : null,\n  \"measureType\" : null\n}"}
END
kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --config retention.ms=-1 --config retention.bytes=-1 --topic content-test
cat in_cmason.json | jq -c 'del(.headers)' | ./kafkacat  -b localhost -P -t content-test -J
kafkacat -o beginning -e -t content-test -b localhost -J > out_cmason.json

Results in some messages like:

"payload":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0004tType\" \u0000\u0000\u0000\u0000\....

Any thoughts on where this corruption might be coming from?

masoncj avatar Mar 03 '21 22:03 masoncj

@masoncj It's probably because \n is processed by jq. Currently my version separates messages by \n at end of line.

JakkuSakura avatar Mar 04 '21 08:03 JakkuSakura

@qiujiangkun, I was able to identify and fix the source of the corruption I was seeing above. It didn't have anything to do with jq or line breaks (jq -c correctly preserves line breaks in JSON-lines files).

I used valgrind. It identified a number of use-after-free issues, such as (just one example, there are many similar ones):

==95246== Invalid read of size 8
==95246==    at 0x10015154E: crc32c (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100177DB7: rd_slice_crc32c (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x10015F337: rd_kafka_msgset_create_ProduceRequest (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x1001371F1: rd_kafka_ProduceRequest (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100114095: rd_kafka_broker_serve (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x100111260: rd_kafka_broker_thread_main (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x1001534E5: _thrd_wrapper_function (in /usr/local/Cellar/librdkafka/1.2.2/lib/librdkafka.1.dylib)
==95246==    by 0x10064A2EA: _pthread_body (in /usr/lib/system/libsystem_pthread.dylib)
==95246==    by 0x10064D248: _pthread_start (in /usr/lib/system/libsystem_pthread.dylib)
==95246==    by 0x10064940C: thread_start (in /usr/lib/system/libsystem_pthread.dylib)
==95246==  Address 0x1016a69c0 is 768 bytes inside a block of size 2,048 free'd
==95246==    at 0x1000EB0CD: free (in /usr/local/Cellar/valgrind/HEAD-0b5ae2f/lib/valgrind/vgpreload_memcheck-amd64-darwin.so)
==95246==    by 0x10000BC78: yajlTestFree (json.c:436)
==95246==    by 0x1000F7208: yajl_buf_free (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F5285: yajl_free (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x10000BBFE: parse_json_message (json.c:573)
==95246==    by 0x100003502: producer_run (kafkacat.c:419)
==95246==    by 0x10000107E: main (kafkacat.c:2562)
==95246==  Block was alloc'd at
==95246==    at 0x1000EACF5: malloc (in /usr/local/Cellar/valgrind/HEAD-0b5ae2f/lib/valgrind/vgpreload_memcheck-amd64-darwin.so)
==95246==    by 0x10000BC28: yajlTestMalloc (json.c:440)
==95246==    by 0x1000F7250: yajl_buf_append (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F74A8: yajl_string_decode (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x1000F6E86: yajl_do_parse (in /usr/local/Cellar/yajl/2.1.0/lib/libyajl.2.1.0.dylib)
==95246==    by 0x10000BB87: parse_json_message (json.c:564)
==95246==    by 0x100003502: producer_run (kafkacat.c:419)
==95246==    by 0x10000107E: main (kafkacat.c:2562)

(I ❤️ valgrind.)

I think the issue here is that the yajl parser is allocated and freed in parse_json_message(), however we store references to data returned by the parser in the kafkacatMessageContext. It seems at least some of these references may be allocated by the parser and stored in the memory pool allocated by yajl_alloc(), and we use those references after the parser is freed. I believe we may even sometimes pass references to this memory into librdkafka (see above) and it may access it even after produce() has exited. This may only be an issue with larger messages where we don't request that librdkafka copy the message internally.

I have a branch here that fixes these issues. It's a bit messy because I have to manage both the buffer and the parser memory pool, so I use a small allocated stub to store both of these references and free them in the message sent callback from librdkafka. I'd welcome stylistic comments here.

With these changes valgrind reports no errors from kafkacat code.

masoncj avatar Mar 10 '21 01:03 masoncj

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see https://github.com/edenhill/kafkacat/issues/137#issuecomment-796417134 )

Could we please discuss what it might take to get this merged? Thanks so much.

masoncj avatar Mar 11 '21 03:03 masoncj

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see #137 (comment) )

Could we please discuss what it might take to get this merged? Thanks so much.

This would be very useful!

AlexeiZenin avatar Mar 11 '21 14:03 AlexeiZenin

A little ad, you can try out my kafcat

reddit

JakkuSakura avatar Mar 19 '21 10:03 JakkuSakura

@edenhill this is proving really useful for me in copying data between topics/servers. For example:

./kafkacat -C -F ~/.kafka-dev -o beginning -e -t patient -J | ./kafkacat -P -b localhost -t patient -J -X message.max.bytes=80000

(Note messsage.max.bytes required; see #137 (comment) )

Could we please discuss what it might take to get this merged? Thanks so much.

This would definitely be a very welcomed feature!

Tommmster avatar Oct 17 '21 03:10 Tommmster

That'd be a very nice addition but no response for the last 2 years. Is the project somehow on hold?

rasgele avatar Nov 06 '23 09:11 rasgele

@JakkuSakura, very nice work! Thank you. But could you please rebase your work on top of master and resolve conflicts?

Hubbitus avatar Mar 24 '24 17:03 Hubbitus