amazon-kinesis-producer
amazon-kinesis-producer copied to clipboard
Messeges are getting merged
Hi,
I am using Kinesis Producer for Java:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.15.8</version>
</dependency>
This is my send-message code snippet:
String json = objectMapper.writeValueAsString(msg);
log.info("Sending notification to user: {}, message :{}", msg.getPartitioningKey(), json);
byte[] messageBytes = objectMapper.writeValueAsString(msg).getBytes(StandardCharsets.UTF_8);
ByteBuffer data = ByteBuffer.wrap(messageBytes);
ListenableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(STREAM_NAME, msg.getPartitioningKey(), data);
Futures.addCallback(future, new FutureCallback<>() {
The log message produced is:
Sending notification to user: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3, message :{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713264772502","status":"NOT_STARTED","timestamp":1713264772502,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}
But this is what it ends up as in Kinesis. The problem seems to appear when sending messages with different partitioning keys. They end up "merged". As you can see in the example below, this should've been 2 distinct json messages.
In the log messages, the partitioning key is correctly: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
While on Kinesis it seems to be: a
����
$03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
$026e854e-4bac-41ae-97b3-3b39c66cef89��{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713261592840","status":"PROCESSING","timestamp":1713264766170,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}��{"idunId":"026e854e-4bac-41ae-97b3-3b39c66cef89","deviceId":"49-44-55-4E-00-02","action":"liveStreamInsights","recordingId":"1713264702302","partitioningKey":"026e854e-4bac-41ae-97b3-3b39c66cef89","raw_eeg":[-- removed for brevity ---]}DZ @�_@_+q``}3
Any idea what might be the problem?
Thanks for you help!
How do you create the Producer? There's a setting about aggregation that is set to true by default.
You can disable it like so:
KinesisProducerConfiguration config = new KinesisProducerConfiguration().setAggregationEnabled(false);
return new KinesisProducer(config);