amazon-kinesis-producer icon indicating copy to clipboard operation
amazon-kinesis-producer copied to clipboard

Messeges are getting merged

Open IDUN-BogdanPi opened this issue 10 months ago • 1 comments

Hi,

I am using Kinesis Producer for Java:

			<dependency>
				<groupId>com.amazonaws</groupId>
				<artifactId>amazon-kinesis-producer</artifactId>
				<version>0.15.8</version>
			</dependency>

This is my send-message code snippet:

            String json = objectMapper.writeValueAsString(msg);
            log.info("Sending notification to user: {}, message :{}", msg.getPartitioningKey(), json);
            byte[] messageBytes = objectMapper.writeValueAsString(msg).getBytes(StandardCharsets.UTF_8);
            ByteBuffer data = ByteBuffer.wrap(messageBytes);
            ListenableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(STREAM_NAME, msg.getPartitioningKey(), data);

            Futures.addCallback(future, new FutureCallback<>() {

The log message produced is:

Sending notification to user: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3, message :{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713264772502","status":"NOT_STARTED","timestamp":1713264772502,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}

But this is what it ends up as in Kinesis. The problem seems to appear when sending messages with different partitioning keys. They end up "merged". As you can see in the example below, this should've been 2 distinct json messages.

In the log messages, the partitioning key is correctly: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3 While on Kinesis it seems to be: a

image
����
$03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
$026e854e-4bac-41ae-97b3-3b39c66cef89��{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713261592840","status":"PROCESSING","timestamp":1713264766170,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}��{"idunId":"026e854e-4bac-41ae-97b3-3b39c66cef89","deviceId":"49-44-55-4E-00-02","action":"liveStreamInsights","recordingId":"1713264702302","partitioningKey":"026e854e-4bac-41ae-97b3-3b39c66cef89","raw_eeg":[-- removed for brevity ---]}DZ	@�_@_+q``}3
image

Any idea what might be the problem?

Thanks for you help!

IDUN-BogdanPi avatar Apr 16 '24 11:04 IDUN-BogdanPi

How do you create the Producer? There's a setting about aggregation that is set to true by default.

You can disable it like so:

KinesisProducerConfiguration config = new KinesisProducerConfiguration().setAggregationEnabled(false);

return new KinesisProducer(config);

lbourdages avatar May 10 '24 14:05 lbourdages