kcat icon indicating copy to clipboard operation
kcat copied to clipboard

Question About Producer

Open dogac00 opened this issue 3 years ago • 3 comments

Hello,

I want to produce a JSON value with key using kafkacat.

What I have achieved so far is I can produce the message with value but I cannot set the key.

Here's my code:

echo $EVENT_JSON | kafkacat -b $KAFKA_BROKERS -t $TOPIC -K $EVENT_KEY;

My EVENT_JSON is this: {"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1} My EVENT_KEY is this: 123

It produces {"id": as key, truncates some value, produces rest of JSON as value.

How can I achieve to send it like this:

KEY: 123
VALUE: {"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}

Thanks.

dogac00 avatar Aug 19 '21 13:08 dogac00

If you are just to produce this single message, you could use the -k (small k) to specify the key of the message.

You should also somehow escape the strings in your input, otherwise they will disappear when you produce:

# NO ESCAPE
$ EVENT_JSON={"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
$ echo $EVENT_JSON | kafkacat -P -b localhost:9092 -t test_topic -k 123
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
  "topic": "test_topic",
  "partition": 0,
  "offset": 1,
  "tstype": "create",
  "ts": 1630078277397,
  "broker": 0,
  "key": "123",
  "payload": "{id:123,typeId:1,createdDate:2021-08-19T08:48:58.77389,quantity:1}"
}

If you instead use a tool (or by hand) escapes the JSON object before you produce with echo, it will be preserved:

$ EVENT_JSON={\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}
$ echo $EVENT_JSON | kafkacat -P -b localhost:9092 -t test_topic -k 123
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
  "topic": "test_topic",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1630078230496,
  "broker": 0,
  "key": "123",
  "payload": "{\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}"
}

You could also send the content as a file, then you don't need to escape beforehand:

$ cat event_json_file
{"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
$ kafkacat -P -b localhost:9092 -t test_topic -k 123 event_json_file
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
  "topic": "ttt",
  "partition": 0,
  "offset": 2,
  "tstype": "create",
  "ts": 1630078679461,
  "broker": 0,
  "key": "123",
  "payload": "{\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}\n"
}

mikaello avatar Aug 27 '21 15:08 mikaello

Hi @mikaello

Thanks for your answer,

I tried producing with -k (small k) but I am getting this error in a Debian environment:

kafkacat: invalid option -- 'k'
Error: unknown argument

Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version KAFKACAT_VERSION (JSON) (librdkafka 0.9.3)


General options:
  -C | -P | -L       Mode: Consume, Produce or metadata List
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -X list            List available librdkafka configuration properties
  -X prop=val        Set librdkafka configuration property.
                     Properties prefixed with "topic." are
                     applied as topic properties.
  -X dump            Dump configuration and exit.
  -d <dbg1,...>      Enable librdkafka debugging:
                     all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
  -q                 Be quiet (verbosity set to 0)
  -v                 Increase verbosity
  -V                 Print version

Producer options:
  -z snappy|gzip     Message compression. Default: none
  -p -1              Use random partitioner
  -D <delim>         Delimiter to split input into messages
  -K <delim>         Delimiter to split input key and message
  -l                 Send messages from a file separated by
                     delimiter, as with stdin.
                     (only one file allowed)
  -T                 Output sent messages to stdout, acting like tee.
  -c <cnt>           Exit after producing this number of messages
  -Z                 Send empty messages as NULL messages
  file1 file2..      Read messages from files.
                     With -l, only one file permitted.
                     Otherwise, the entire file contents will
                     be sent as one single message.

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
  -e                 Exit successfully when last message received
  -f <fmt..>         Output formatting string, see below.
                     Takes precedence over -D and -K.
  -J                 Output with JSON envelope
  -D <delim>         Delimiter to separate messages on output
  -K <delim>         Print message keys prefixing the message
                     with specified delimiter.
  -O                 Print message offset using -K delimiter
  -c <cnt>           Exit after consuming this number of messages
  -Z                 Print NULL messages and keys as "NULL"(instead of empty)
  -u                 Unbuffered output

Metadata options:
  -t <topic>         Topic to query (optional)


Format string tokens:
  %s                 Message payload
  %S                 Message payload length (or -1 for NULL)
  %R                 Message payload length (or -1 for NULL) serialized
                     as a binary big endian 32-bit signed integer
  %k                 Message key
  %K                 Message key length (or -1 for NULL)
  %t                 Topic
  %p                 Partition
  %o                 Message offset
  \n \r \t           Newlines, tab
  \xXX \xNNN         Any ASCII character
 Example:
  -f 'Topic %t [%p] at offset %o: key %k: %s\n'


Consumer mode (writes messages to stdout):
  kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -C -b ...

High-level KafkaConsumer mode:
  kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+

Producer mode (reads messages from stdin):
  ... | kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -P -b ...

Metadata listing:
  kafkacat -L -b <broker> [-t <topic>]

My code is this:

function produce_event() {
  echo $(echo $2 | kafkacat -b $KAFKA_BROKERS -t $TOPIC -k $1);
}

function send_events() {
  local payloads=$(get_event_payloads);

  for payload in $payloads; do
    local key=$(echo $payload | jq -r '.id');

    produce_event $key $payload;
  done;
}

However, when I execute the same script on my Mac (macOS Big Sur), it works fine.

dogac00 avatar Aug 30 '21 18:08 dogac00

Then your version of kafkacat must be quite old on debian, -k was added in version 1.4 of Kafkacat. You must upgrade or use the docker version

mikaello avatar Aug 30 '21 20:08 mikaello

I tried piping from echo doing this, but I get an error:

sh-4.2$ echo "test222" | ./kcat -P -b localhost:9092 -t testtopic
% ERROR: Unable to read message: No such file or directory

I'm able to read from a file ok using this:

sh-4.2$ ./kcat -P -b localhost:9092 -t testtopic test.txt

Any idea why I'm not able to produce using the echo?

aitchon avatar Nov 17 '22 04:11 aitchon

What version of kcat are you using? Provide the output from ./kcat -V

edenhill avatar Nov 17 '22 11:11 edenhill

sh-4.2$ ./kcat -V
kcat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kcat
Copyright (c) 2014-2021, Magnus Edenhill
Version 1.7.1-10-g96723a (JSON, Avro, Transactions, IncrementalAssign, MockCluster, JSONVerbatim, librdkafka 1.8.2 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer) 

aitchon avatar Nov 17 '22 11:11 aitchon

Fixed on latest master

edenhill avatar Nov 17 '22 15:11 edenhill