buffer-trigger
buffer-trigger copied to clipboard
BatchConsumeBlockingQueueTrigger may block consume if passing a special executorService and the lingerMs is long
version: lastest JVM version (java -version): 1.8 Description of the problem including expected versus actual behavior: expected behavior consumer function is called when the batchSize is reached or the linger time passed actual behavior consumer function is only callen when linger time passed code to reproduce
public static void main(String[] args) {
ScheduledExecutorService threadPoolExecutorService = new ScheduledThreadPoolExecutor(1);
ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorService() {
@NotNull
@Override
public ScheduledFuture<?> schedule(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
return threadPoolExecutorService.schedule(command, delay, unit);
}
@NotNull
@Override
public <V> ScheduledFuture<V> schedule(@NotNull Callable<V> callable, long delay, @NotNull TimeUnit unit) {
return threadPoolExecutorService.schedule(callable, delay, unit);
}
@NotNull
@Override
public ScheduledFuture<?> scheduleAtFixedRate(@NotNull Runnable command, long initialDelay, long period, @NotNull TimeUnit unit) {
return threadPoolExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@NotNull
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(@NotNull Runnable command, long initialDelay, long delay, @NotNull TimeUnit unit) {
return null;
}
@Override
public void shutdown() {
}
@NotNull
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return false;
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
return null;
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
return null;
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
return null;
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return null;
}
@NotNull
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(@NotNull Runnable command) {
command.run();
}
};
BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.batchSize(20).linger(3000, TimeUnit.SECONDS)
.setConsumerEx(t -> {
System.out.println(Arrays.toString(t.toArray()));
}).setScheduleExecutorService(scheduledExecutorService).build();
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 10; i++) {
bufferTrigger.enqueue(String.valueOf(j * 100 + i));
}
System.out.println("pending:" + bufferTrigger.getPendingChanges());
}
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.HOURS);
}
Related code change the running.set(true); before scheduledExecutorService.execute? https://github.com/PhantomThief/buffer-trigger/blob/d8d95c1c326a97d23c22f68523def0f73ab18caf/src/main/java/com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger.java#L78-L80