cyclops
cyclops copied to clipboard
futureStream from Queue<T> does not fire on timeout.
Describe the bug I have created a reproducible test case, and it seems that this old issue still persists: https://stackoverflow.com/questions/42363084/using-cyclops-react-for-batching-on-a-async-queue-stream
To Reproduce Steps to reproduce the behavior: Test case attached in comment.
Expected behavior The future should be run after 200ms.
Here is a test case:
@Test
public void isolatedQueueTest() throws InterruptedException {
Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
.parallel()
.groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
.runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
bufferedMessages -> log.info("Received {}",
bufferedMessages.stream().collect(Collectors.joining(",")))));
ReactiveSeq.range(0,11)
.forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
TimeUnit.MILLISECONDS.sleep(1000);
}
The last pair will not be logged.
More accurate test case
@Test
public void isolatedQueueTest() throws InterruptedException {
Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
ListX<String> receiver = ListX.empty();
StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
.parallel()
.groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
.runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
bufferedMessages -> receiver.addAll(bufferedMessages.stream().toList())));
ReactiveSeq.range(0,11)
.forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
TimeUnit.MILLISECONDS.sleep(1000);
pubSubMessageQueue.close();
TimeUnit.MILLISECONDS.sleep(1000);
assertEquals(11,receiver.size());
}
Without close and secondary wait, this test case will fail.