Angus Chen

Results 29 comments of Angus Chen

Our test is running in batch receive max 1000 events then process those 1000 events concurrent and parallel ``` Flux.fromIterable(events).parallel().runOn(Schedulers.fromExecutor(this.executorService)) .flatMap(event -> { // handle event and ack }) ```...

We tested again it still happen, but we found pulsar client default doesn't wait ack return and return CompletableFuture.complete directly. https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L264 so we turn on https://pulsar.apache.org/api/client/3.0.x/org/apache/pulsar/client/api/ConsumerBuilder.html#isAckReceiptEnabled(boolean) because if enable that...

https://github.com/semistone/reactorSemaphore/blob/main/src/main/java/ReactorSemaphore.java

I had done a few unit tests, it's simple POC but it worked.

Google find something similar https://www.javadoc.io/doc/org.redisson/redisson/3.9.1/org/redisson/api/RSemaphoreReactive.html#tryAcquire(int)

@lhotari I am checking when that byteBuf went wrong and in OpAddEntry.java I verify data when construct this object and save original data and during run(), I try to compare...

I also test again if publish payload always 20K, it won't happen only happen when normal is 2K but some data bigger than 16K( sound like netty receive buffer size...

> > unfortunately I can't preproduce in docker, I guess docker standalone is different from my pulsar cluster. > > my pulsar cluster is > > almost default config but...

@lhotari > acknowledgmentAtBatchIndexLevelEnabled Yes, I enabled it and I disable batch mode(-bd) in producer after I found it seem not related to batch mode and after debugging, I found If...

I also debug in PulsarDecoder.channelRead print bytebuf object id and compare with the bytebuf in OpAddEntry I don't see the same bytebuf object been reused during OpAddEntry.createNoRetainBuffer and OpAddEntry.run