buffer-trigger icon indicating copy to clipboard operation
buffer-trigger copied to clipboard

BatchConsumeBlockingQueueTrigger may block consume if passing a special executorService and the lingerMs is long

Open liudunxu opened this issue 6 years ago • 0 comments

version: lastest JVM version (java -version): 1.8 Description of the problem including expected versus actual behavior: expected behavior consumer function is called when the batchSize is reached or the linger time passed actual behavior consumer function is only callen when linger time passed code to reproduce

    public static void main(String[] args) {

ScheduledExecutorService threadPoolExecutorService = new ScheduledThreadPoolExecutor(1);

        ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorService() {
            @NotNull
            @Override
            public ScheduledFuture<?> schedule(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.schedule(command, delay, unit);
            }

            @NotNull
            @Override
            public <V> ScheduledFuture<V> schedule(@NotNull Callable<V> callable, long delay, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.schedule(callable, delay, unit);
            }

            @NotNull
            @Override
            public ScheduledFuture<?> scheduleAtFixedRate(@NotNull Runnable command, long initialDelay, long period, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
            }

            @NotNull
            @Override
            public ScheduledFuture<?> scheduleWithFixedDelay(@NotNull Runnable command, long initialDelay, long delay, @NotNull TimeUnit unit) {
                return null;
            }

            @Override
            public void shutdown() {

            }

            @NotNull
            @Override
            public List<Runnable> shutdownNow() {
                return null;
            }

            @Override
            public boolean isShutdown() {
                return false;
            }

            @Override
            public boolean isTerminated() {
                return false;
            }

            @Override
            public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
                return false;
            }

            @NotNull
            @Override
            public <T> Future<T> submit(@NotNull Callable<T> task) {
                return null;
            }

            @NotNull
            @Override
            public <T> Future<T> submit(@NotNull Runnable task, T result) {
                return null;
            }

            @NotNull
            @Override
            public Future<?> submit(@NotNull Runnable task) {
                return null;
            }

            @NotNull
            @Override
            public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
                return null;
            }

            @NotNull
            @Override
            public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException {
                return null;
            }

            @NotNull
            @Override
            public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
                return null;
            }

            @Override
            public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }

            @Override
            public void execute(@NotNull Runnable command) {
                command.run();
            }
        };

        BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
                .batchSize(20).linger(3000, TimeUnit.SECONDS)
                .setConsumerEx(t -> {
                    System.out.println(Arrays.toString(t.toArray()));
                }).setScheduleExecutorService(scheduledExecutorService).build();


        for (int j = 0; j < 100; j++) {
            for (int i = 0; i < 10; i++) {
                bufferTrigger.enqueue(String.valueOf(j * 100 + i));
            }
            System.out.println("pending:" + bufferTrigger.getPendingChanges());
        }

        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.HOURS);

    }

Related code change the running.set(true); before scheduledExecutorService.execute? https://github.com/PhantomThief/buffer-trigger/blob/d8d95c1c326a97d23c22f68523def0f73ab18caf/src/main/java/com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger.java#L78-L80

liudunxu avatar Apr 13 '18 11:04 liudunxu