vertx-sql-client icon indicating copy to clipboard operation
vertx-sql-client copied to clipboard

Event loop blocking warning from inside vertx-pg-client

Open calmlaw opened this issue 3 years ago • 3 comments

Version

vertx-pg-client version: 4.2.7

Context

Hi, developers

I have a data analysis system. It needs to read a lot of data in real time. I control the size of a single batch read to be 200,000, and then insert the results into the database in batches after the analysis is complete. The size of batch insertion is not fixed, it may be more than 100,000, but it will not exceed 200,000. I looked at the logs and found that there are many warning logs where the event loop is blocked. I think the developers might be interested in this, maybe it's some bug that hasn't been discovered yet?

2022-04-27 16:52:38,539 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 5767 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
	at [email protected]/jdk.internal.misc.Unsafe.setMemory0(Native Method)
	at [email protected]/jdk.internal.misc.Unsafe.setMemory(Unsafe.java:742)
	at [email protected]/jdk.internal.misc.Unsafe.setMemory(Unsafe.java:753)
	at [email protected]/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:130)
	at [email protected]/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:332)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:648)
	at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:637)
	at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:213)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
	at io.netty.buffer.PoolArena.reallocate(PoolArena.java:286)
	at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:122)
	at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305)
	at io.netty.buffer.AbstractByteBuf.setCharSequence0(AbstractByteBuf.java:702)
	at io.netty.buffer.AbstractByteBuf.writeCharSequence(AbstractByteBuf.java:1187)
	at io.vertx.pgclient.impl.codec.DataTypeCodec.binaryEncodeBPCHAR(DataTypeCodec.java:969)
	at io.vertx.pgclient.impl.codec.DataTypeCodec.encodeBinary(DataTypeCodec.java:220)
	at io.vertx.pgclient.impl.codec.DataTypeCodec.binaryEncodeArray(DataTypeCodec.java:1637)
	at io.vertx.pgclient.impl.codec.DataTypeCodec.encodeBinary(DataTypeCodec.java:223)
	at io.vertx.pgclient.impl.codec.PgEncoder.writeBind(PgEncoder.java:427)
	at io.vertx.pgclient.impl.codec.ExtendedQueryCommandCodec.encode(ExtendedQueryCommandCodec.java:52)
	at io.vertx.pgclient.impl.codec.PgEncoder.write(PgEncoder.java:102)
	at io.vertx.pgclient.impl.codec.PgEncoder.write(PgEncoder.java:133)
	at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$prepareCommand$4(SocketConnectionBase.java:264)
	at io.vertx.sqlclient.impl.SocketConnectionBase$$Lambda$844/0x00000008010e4270.handle(Unknown Source)
	at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:287)
	at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:96)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:100)
	at io.vertx.sqlclient.impl.SocketConnectionBase$$Lambda$837/0x00000008010c20e0.handle(Unknown Source)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:98)
	at io.vertx.pgclient.impl.codec.PgEncoder$$Lambda$840/0x00000008010c2b60.handle(Unknown Source)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
	at io.vertx.pgclient.impl.codec.PrepareStatementCommandCodec.handleReadyForQuery(PrepareStatementCommandCodec.java:96)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at [email protected]/java.lang.Thread.run(Thread.java:833)

image


public Future<List<FareHistoryCompress>> readByLastUpdateAt(SqlClient client, LocalDateTime lastUpdatedAt, int limit) {
    String sql = lastUpdatedAt == null ? """
        SELECT * FROM fare_history_compress
        ORDER BY updated_at, created_at
        LIMIT $1
        """ : """
        SELECT * FROM fare_history_compress
        WHERE updated_at >= $1
        ORDER BY updated_at, created_at
        LIMIT $2
        """;
    Tuple tuple = lastUpdatedAt == null ? Tuple.of(limit) : Tuple.of(lastUpdatedAt, limit);
    return client.preparedQuery(sql).mapping(this::mapping).execute(tuple).map(this::list);
}

public Future<Integer> upsert(SqlClient client, List<FareHistoryCompress> fares) {
    String sql = """
        INSERT INTO fare_history_compress(id,
            departure,arrival,carrier,flight_no,departure_date,
            departure_time,arrival_date,arrival_time,cabin,
            cabin_class,adult_price,adult_tax,child_price,child_tax,
            infant_price,infant_tax,total_price,inventory,passengers,
            channel,updated_at,created_at,compress_ids)
        VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
        ON CONFLICT ON CONSTRAINT fare_history_compress_pkey DO UPDATE
        SET departure = $2,arrival = $3,carrier = $4,flight_no = $5,departure_date = $6,
            departure_time = $7,arrival_date = $8,arrival_time = $9,cabin = $10,
            cabin_class = $11,adult_price = $12,adult_tax = $13,child_price = $14,child_tax = $15,
            infant_price = $16,infant_tax = $17,total_price = $18,inventory = $19,passengers = $20,
            channel = $21,updated_at = $22,created_at=$23,compress_ids=$24
        """;
    List<Tuple> tuples = new ArrayList<>();
    for (FareHistoryCompress fare : fares) {
        List<Object> list = new ArrayList<>(24);
        list.add(fare.getId());
        list.add(fare.getDeparture());
        list.add(fare.getArrival());
        list.add(fare.getCarrier());
        list.add(fare.getFlightNo());
        list.add(fare.getDepartureDate());
        list.add(fare.getDepartureTime());
        list.add(fare.getArrivalDate());
        list.add(fare.getArrivalTime());
        list.add(fare.getCabin().toArray(new String[0]));
        list.add(fare.getCabinClass().toArray(new String[0]));
        list.add(fare.getAdultPrice());
        list.add(fare.getAdultTax());
        list.add(fare.getChildPrice());
        list.add(fare.getChildTax());
        list.add(fare.getInfantPrice());
        list.add(fare.getInfantTax());
        list.add(fare.getTotalPrice());
        list.add(fare.getInventory().toArray(new Integer[0]));
        list.add(fare.getPassengers());
        list.add(fare.getChannel());
        list.add(fare.getUpdatedAt());
        list.add(fare.getCreatedAt());
        list.add(fare.getCompressIds().toArray(new String[0]));

        tuples.add(Tuple.wrap(list));
    }
    return client.preparedQuery(sql).executeBatch(tuples).map(RowSet::rowCount);
}

Do you have a reproducer?

A reproducer is a simple project hosted on GitHub (or another forge supporting git clone operation) that has a build file that can be executed to reproduce the issue.

Reproducers are very helpful for contributors and will likely help them fixing your bug faster.

  • Link to github project/gist

Steps to reproduce

  1. ...
  2. ...
  3. ...
  4. ...

Extra

  • Anything that can be relevant such as OS version, JVM version

calmlaw avatar Apr 27 '22 09:04 calmlaw

it seems more related to GC and the fact that you put a lot of load on the GC as you are submitting very large batches.

can you try using smaller batches ?

vietj avatar Jun 01 '22 12:06 vietj

hi, @vietj

Thank you for your reply. Can it be seen from the jvisualvm report that blocking is related to GC? The cpu usage is not high, and the jvm memory does not reach Xmx4G. From the log, we can see that there is the io.vertx.pgclient.impl.codec.PgEncoder class. Is it possible that the thread is blocked when encoding a large Tuple? Or is it waiting for Netty to allocate memory? Sorry, this is just my guess, I don't know much about it. You might be able to get useful information out of it, and maybe there's something you can optimize. In addition, everything works fine after I change the batch to 50000. Thanks

Regards, aom

calmlaw avatar Jun 02 '22 09:06 calmlaw

I think the issue is that your case needs to allocate too much memory at the same time because it is a batch.

vietj avatar Jun 03 '22 07:06 vietj