vertx-sql-client
vertx-sql-client copied to clipboard
Event loop blocking warning from inside vertx-pg-client
Version
vertx-pg-client version: 4.2.7
Context
Hi, developers
I have a data analysis system. It needs to read a lot of data in real time. I control the size of a single batch read to be 200,000, and then insert the results into the database in batches after the analysis is complete. The size of batch insertion is not fixed, it may be more than 100,000, but it will not exceed 200,000. I looked at the logs and found that there are many warning logs where the event loop is blocked. I think the developers might be interested in this, maybe it's some bug that hasn't been discovered yet?
2022-04-27 16:52:38,539 WARN [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 5767 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
at [email protected]/jdk.internal.misc.Unsafe.setMemory0(Native Method)
at [email protected]/jdk.internal.misc.Unsafe.setMemory(Unsafe.java:742)
at [email protected]/jdk.internal.misc.Unsafe.setMemory(Unsafe.java:753)
at [email protected]/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:130)
at [email protected]/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:332)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:648)
at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:637)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:213)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:286)
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:122)
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305)
at io.netty.buffer.AbstractByteBuf.setCharSequence0(AbstractByteBuf.java:702)
at io.netty.buffer.AbstractByteBuf.writeCharSequence(AbstractByteBuf.java:1187)
at io.vertx.pgclient.impl.codec.DataTypeCodec.binaryEncodeBPCHAR(DataTypeCodec.java:969)
at io.vertx.pgclient.impl.codec.DataTypeCodec.encodeBinary(DataTypeCodec.java:220)
at io.vertx.pgclient.impl.codec.DataTypeCodec.binaryEncodeArray(DataTypeCodec.java:1637)
at io.vertx.pgclient.impl.codec.DataTypeCodec.encodeBinary(DataTypeCodec.java:223)
at io.vertx.pgclient.impl.codec.PgEncoder.writeBind(PgEncoder.java:427)
at io.vertx.pgclient.impl.codec.ExtendedQueryCommandCodec.encode(ExtendedQueryCommandCodec.java:52)
at io.vertx.pgclient.impl.codec.PgEncoder.write(PgEncoder.java:102)
at io.vertx.pgclient.impl.codec.PgEncoder.write(PgEncoder.java:133)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$prepareCommand$4(SocketConnectionBase.java:264)
at io.vertx.sqlclient.impl.SocketConnectionBase$$Lambda$844/0x00000008010e4270.handle(Unknown Source)
at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:287)
at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:96)
at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:100)
at io.vertx.sqlclient.impl.SocketConnectionBase$$Lambda$837/0x00000008010c20e0.handle(Unknown Source)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:98)
at io.vertx.pgclient.impl.codec.PgEncoder$$Lambda$840/0x00000008010c2b60.handle(Unknown Source)
at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
at io.vertx.pgclient.impl.codec.PrepareStatementCommandCodec.handleReadyForQuery(PrepareStatementCommandCodec.java:96)
at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at [email protected]/java.lang.Thread.run(Thread.java:833)

public Future<List<FareHistoryCompress>> readByLastUpdateAt(SqlClient client, LocalDateTime lastUpdatedAt, int limit) {
String sql = lastUpdatedAt == null ? """
SELECT * FROM fare_history_compress
ORDER BY updated_at, created_at
LIMIT $1
""" : """
SELECT * FROM fare_history_compress
WHERE updated_at >= $1
ORDER BY updated_at, created_at
LIMIT $2
""";
Tuple tuple = lastUpdatedAt == null ? Tuple.of(limit) : Tuple.of(lastUpdatedAt, limit);
return client.preparedQuery(sql).mapping(this::mapping).execute(tuple).map(this::list);
}
public Future<Integer> upsert(SqlClient client, List<FareHistoryCompress> fares) {
String sql = """
INSERT INTO fare_history_compress(id,
departure,arrival,carrier,flight_no,departure_date,
departure_time,arrival_date,arrival_time,cabin,
cabin_class,adult_price,adult_tax,child_price,child_tax,
infant_price,infant_tax,total_price,inventory,passengers,
channel,updated_at,created_at,compress_ids)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
ON CONFLICT ON CONSTRAINT fare_history_compress_pkey DO UPDATE
SET departure = $2,arrival = $3,carrier = $4,flight_no = $5,departure_date = $6,
departure_time = $7,arrival_date = $8,arrival_time = $9,cabin = $10,
cabin_class = $11,adult_price = $12,adult_tax = $13,child_price = $14,child_tax = $15,
infant_price = $16,infant_tax = $17,total_price = $18,inventory = $19,passengers = $20,
channel = $21,updated_at = $22,created_at=$23,compress_ids=$24
""";
List<Tuple> tuples = new ArrayList<>();
for (FareHistoryCompress fare : fares) {
List<Object> list = new ArrayList<>(24);
list.add(fare.getId());
list.add(fare.getDeparture());
list.add(fare.getArrival());
list.add(fare.getCarrier());
list.add(fare.getFlightNo());
list.add(fare.getDepartureDate());
list.add(fare.getDepartureTime());
list.add(fare.getArrivalDate());
list.add(fare.getArrivalTime());
list.add(fare.getCabin().toArray(new String[0]));
list.add(fare.getCabinClass().toArray(new String[0]));
list.add(fare.getAdultPrice());
list.add(fare.getAdultTax());
list.add(fare.getChildPrice());
list.add(fare.getChildTax());
list.add(fare.getInfantPrice());
list.add(fare.getInfantTax());
list.add(fare.getTotalPrice());
list.add(fare.getInventory().toArray(new Integer[0]));
list.add(fare.getPassengers());
list.add(fare.getChannel());
list.add(fare.getUpdatedAt());
list.add(fare.getCreatedAt());
list.add(fare.getCompressIds().toArray(new String[0]));
tuples.add(Tuple.wrap(list));
}
return client.preparedQuery(sql).executeBatch(tuples).map(RowSet::rowCount);
}
Do you have a reproducer?
A reproducer is a simple project hosted on GitHub (or another forge supporting git clone operation) that has a build file that can be executed to reproduce the issue.
Reproducers are very helpful for contributors and will likely help them fixing your bug faster.
- Link to github project/gist
Steps to reproduce
- ...
- ...
- ...
- ...
Extra
- Anything that can be relevant such as OS version, JVM version
it seems more related to GC and the fact that you put a lot of load on the GC as you are submitting very large batches.
can you try using smaller batches ?
hi, @vietj
Thank you for your reply. Can it be seen from the jvisualvm report that blocking is related to GC? The cpu usage is not high, and the jvm memory does not reach Xmx4G. From the log, we can see that there is the io.vertx.pgclient.impl.codec.PgEncoder class. Is it possible that the thread is blocked when encoding a large Tuple? Or is it waiting for Netty to allocate memory? Sorry, this is just my guess, I don't know much about it. You might be able to get useful information out of it, and maybe there's something you can optimize. In addition, everything works fine after I change the batch to 50000. Thanks
Regards, aom
I think the issue is that your case needs to allocate too much memory at the same time because it is a batch.