kcat
kcat copied to clipboard
Question About Producer
Hello,
I want to produce a JSON value with key using kafkacat
.
What I have achieved so far is I can produce the message with value but I cannot set the key.
Here's my code:
echo $EVENT_JSON | kafkacat -b $KAFKA_BROKERS -t $TOPIC -K $EVENT_KEY;
My EVENT_JSON
is this:
{"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
My EVENT_KEY
is this:
123
It produces {"id":
as key, truncates some value, produces rest of JSON as value.
How can I achieve to send it like this:
KEY: 123
VALUE: {"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
Thanks.
If you are just to produce this single message, you could use the -k
(small k) to specify the key of the message.
You should also somehow escape the strings in your input, otherwise they will disappear when you produce:
# NO ESCAPE
$ EVENT_JSON={"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
$ echo $EVENT_JSON | kafkacat -P -b localhost:9092 -t test_topic -k 123
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
"topic": "test_topic",
"partition": 0,
"offset": 1,
"tstype": "create",
"ts": 1630078277397,
"broker": 0,
"key": "123",
"payload": "{id:123,typeId:1,createdDate:2021-08-19T08:48:58.77389,quantity:1}"
}
If you instead use a tool (or by hand) escapes the JSON object before you produce with echo, it will be preserved:
$ EVENT_JSON={\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}
$ echo $EVENT_JSON | kafkacat -P -b localhost:9092 -t test_topic -k 123
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
"topic": "test_topic",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1630078230496,
"broker": 0,
"key": "123",
"payload": "{\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}"
}
You could also send the content as a file, then you don't need to escape beforehand:
$ cat event_json_file
{"id":123,"typeId":1,"createdDate":"2021-08-19T08:48:58.77389","quantity":1}
$ kafkacat -P -b localhost:9092 -t test_topic -k 123 event_json_file
$ kafkacat -C -b localhost:9092 -t test_topic -e -o beginning -J | jq
{
"topic": "ttt",
"partition": 0,
"offset": 2,
"tstype": "create",
"ts": 1630078679461,
"broker": 0,
"key": "123",
"payload": "{\"id\":123,\"typeId\":1,\"createdDate\":\"2021-08-19T08:48:58.77389\",\"quantity\":1}\n"
}
Hi @mikaello
Thanks for your answer,
I tried producing with -k
(small k) but I am getting this error in a Debian environment:
kafkacat: invalid option -- 'k'
Error: unknown argument
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version KAFKACAT_VERSION (JSON) (librdkafka 0.9.3)
General options:
-C | -P | -L Mode: Consume, Produce or metadata List
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-J Output with JSON envelope
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options:
-t <topic> Topic to query (optional)
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
My code is this:
function produce_event() {
echo $(echo $2 | kafkacat -b $KAFKA_BROKERS -t $TOPIC -k $1);
}
function send_events() {
local payloads=$(get_event_payloads);
for payload in $payloads; do
local key=$(echo $payload | jq -r '.id');
produce_event $key $payload;
done;
}
However, when I execute the same script on my Mac (macOS Big Sur), it works fine.
Then your version of kafkacat must be quite old on debian, -k
was added in version 1.4 of Kafkacat. You must upgrade or use the docker version
I tried piping from echo doing this, but I get an error:
sh-4.2$ echo "test222" | ./kcat -P -b localhost:9092 -t testtopic
% ERROR: Unable to read message: No such file or directory
I'm able to read from a file ok using this:
sh-4.2$ ./kcat -P -b localhost:9092 -t testtopic test.txt
Any idea why I'm not able to produce using the echo?
What version of kcat are you using?
Provide the output from ./kcat -V
sh-4.2$ ./kcat -V
kcat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kcat
Copyright (c) 2014-2021, Magnus Edenhill
Version 1.7.1-10-g96723a (JSON, Avro, Transactions, IncrementalAssign, MockCluster, JSONVerbatim, librdkafka 1.8.2 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer)
Fixed on latest master