amazon-kinesis-client icon indicating copy to clipboard operation
amazon-kinesis-client copied to clipboard

KCL Worker is not responding to maxRecords value

Open muhufuk opened this issue 7 years ago • 12 comments

While starting worker I have set maxRecords to 10 in KinesisClientLibConfiguration but still I am getting large (~15 ~13 etc..) records in one call of ProcessRecords.

public KinesisClientLibConfiguration kinesisClientLibConfiguration( final ClientConfiguration clientConfiguration) { return new KinesisClientLibConfiguration( applicationName + dataStreamName, dataStreamName, new DefaultAWSCredentialsProviderChain(), "Worker-" + UUID.randomUUID().toString()) .withRegionName(region) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withCommonClientConfig(clientConfiguration) .withInitialLeaseTableReadCapacity(leaseTableReadCapacity) .withInitialLeaseTableWriteCapacity(leaseTableWriteCapacity) .withFailoverTimeMillis(failOverTime) .withMaxRecords(10);

muhufuk avatar Jan 22 '18 12:01 muhufuk

I am seeing the same behavior...maxRecords being ignored completely. In fact the number of records being returned is often larger than the default of 10,000. Could really use a workaround.

cgpassante avatar Mar 02 '18 23:03 cgpassante

Same here. This is an annoying issue when we want to slow down the processing when our downstream processing slows.

xujiaxj avatar Mar 12 '18 17:03 xujiaxj

Based on [1] MaxRecords is the maximum number of Kinesis records per request. If KPL is used on the producer side, there will be more user records in each KinesisRecord so the RecordsProcessor will receive more records per batch.

[1] https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java

shenavaa avatar Mar 14 '18 03:03 shenavaa

@shenavaa Yes we also suspected from it but in some cases we have got 3k+ record even though max record is 1500. Beside as you mentioned we have used kpl but aggretion is false.

muhufuk avatar Mar 14 '18 11:03 muhufuk

@muhufuk Can you also share KPL configuration properties and number of shards in the stream for reproduction ?

shenavaa avatar Mar 14 '18 23:03 shenavaa

@shenavaa we have just changed region and aggregation flag. rest is set as default. we have 60 shards.

muhufuk avatar Mar 15 '18 06:03 muhufuk

Hello, I am having the same issue! I am running version 1.9.0 and I have a similar setup for my KinesisClientLibConfiguration. However when setting the maxrecord it is not limited. Worth mentioning I have a custom Record Processor class.

kakaday22 avatar May 14 '19 19:05 kakaday22

I am using KCL 1.9.1 and have the same issue. I set MaxRecords around 300-500, but it keeps getting records around 1000 in one time.

WineYe avatar Jul 01 '19 02:07 WineYe

So how do you limit the KPL so we can get a consistent number of records?

cebbott avatar Aug 31 '22 20:08 cebbott

My understanding of reading the kcl source code is that maxRecords is only used by the polling fetcher, not the fanout one. I don't see a way to limit the max records when using fanout... which is unfortunate because it can lead to extremely large per-shard caches, c.f. https://github.com/awslabs/aws-eventstream-java/pull/4

fommil avatar Apr 23 '24 13:04 fommil

I'm using KVL V2, I used this and it worked

final var polling =
      new PollingConfig(getKinesisStreamName(), kinesisClient)
        .maxRecords(maximumRecordsPerCall)
        .idleTimeBetweenReadsInMillis(getKinesisTimeIntervalBetweenCalls());

    var retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(polling);

 this.scheduler =
      new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        retrievalConfig);

wellingtonmacena avatar Jun 19 '24 18:06 wellingtonmacena