spark-redis icon indicating copy to clipboard operation
spark-redis copied to clipboard

[Structured Steaming] Limit the number of processed items per trigger (batch)

Open fe2s opened this issue 5 years ago • 0 comments

The request came up from https://stackoverflow.com/questions/56679474/how-to-set-maximum-batch-size-of-spark-readstream-in-structured-streaming

I am reading batch record from redis using spark-structured-streaming foreachBatch by following code (trying to set the batchSize by stream.read.batch.size)

val data = spark.readStream.format("redis") .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => ... // we count size of batchDF here, we want to limit its size // some operation }

currently we set stream.read.batch.size to 128 but seems this does not work. The batchSize seems to be random, sometimes over 1000 even 10000.

However I do not want to wait for so long (10000 records) because I have some operations (in code comment // some operation) need to be done as soon as possible, so that I want to control the maximum batch size so when records reach this limitation it could be processed immediately, how to do it?

The stream.read.batch.size parameter controls the number of items read by a single Redis API call (count parameter of XREADGROUP call). It doesn't affect the number of items per trigger (batchDF size).

If possible, we should introduce a new parameter that will limit the number of items per trigger similar to maxOffsetsPerTrigger in Kafka integration.

fe2s avatar Jun 20 '19 07:06 fe2s