r2dbc-postgresql icon indicating copy to clipboard operation
r2dbc-postgresql copied to clipboard

Receive "PostgresqlNonTransientResourceException: [08P01] invalid message format" for in clause with too many bindings

Open cfogrady opened this issue 3 years ago • 5 comments
trafficstars

Bug Report

Versions

  • Driver: 0.9.2.RELEASE
  • Database: 11.1
  • Java: 11
  • OS: Ubuntu 20.04

Current Behavior

When running a simple query containing an in clause with a large number of parameters an invalid message format exception is thrown.
QueryTesting > largeQuery() FAILED
    io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: [08P01] invalid message format
        at app//io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:98)
        at app//io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
        at app//io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
        at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169)
        at app//reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at app//io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
        at app//reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
        at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
        at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
        at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
        at app//io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:671)
        at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:923)
        at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:797)
        at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:703)
        at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
        at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at app//reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
        at app//reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
        at app//reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
        at app//reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at app//io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at app//io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at app//io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at app//io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at app//io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at app//io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at [email protected]/java.lang.Thread.run(Thread.java:829)

Table schema

None

Steps to reproduce

See Test Class of working in clause and failing in clause.
public class QueryTesting {

    private static final int maxConnections = 1000;
    private static final String POSTGRES_VERSION = "postgres:11.1";

    private static final AtomicBoolean isStarted = new AtomicBoolean(false);
    public static PostgreSQLContainer CONTAINER;

    public static PostgresqlConnectionFactory connectionFactory;

    @BeforeAll
    static void setupDBContainer() {
        CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer(POSTGRES_VERSION)
                .withDatabaseName("test")
                .withSharedMemorySize(500L * 1000 * 1000)
                .withCommand("postgres", "-c", "fsync=off", "-c", "max_connections=" + maxConnections)
                .withReuse(true);
        if (isStarted.compareAndSet(false, true)) {
            CONTAINER.start();
            connectionFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                    .host(CONTAINER.getHost())
                    .database(CONTAINER.getDatabaseName())
                    .username(CONTAINER.getUsername())
                    .password(CONTAINER.getPassword())
                    .port(CONTAINER.getFirstMappedPort())
                    .build()
            );
        }
    }

    @Test
    void largeInClauseQuery() {
        int numberOfValues = 70_000;
        Mono<PostgresqlConnection> mono = connectionFactory.create();
        mono.flatMapMany(connection -> {
            StringBuilder builder = new StringBuilder("select 'working' where 1 in (");
            for(int i = 1; i < numberOfValues; i++) {
                builder.append("$").append(i);
                builder.append(", ");
            }
            builder.setLength(builder.length()-2);
            builder.append(")");
            var statement = connection.createStatement(builder.toString());
            for(int i = 1; i < numberOfValues; i++) {
                statement = statement.bind("$" + i, i);
            }
            return statement.execute();
        }).flatMap(this::getStringRowsFromResults).collectList().block();
    }

    @Test
    void smallInClauseQuery() {
        int numberOfValues = 50_000;
        Mono<PostgresqlConnection> mono = connectionFactory.create();
        mono.flatMapMany(connection -> {
            StringBuilder builder = new StringBuilder("select 'working' where 1 in (");
            for(int i = 1; i < numberOfValues; i++) {
                builder.append("$").append(i);
                builder.append(", ");
            }
            builder.setLength(builder.length()-2);
            builder.append(")");
            var statement = connection.createStatement(builder.toString());
            for(int i = 1; i < numberOfValues; i++) {
                statement = statement.bind("$" + i, i);
            }
            return statement.execute();
        }).flatMap(this::getStringRowsFromResults).collectList().block();
    }

    private Flux<String> getStringRowsFromResults(PostgresqlResult result) {
        return result.map((row, rowMetadata) -> row.get(0, String.class));
    }
}

Expected behavior/code

I expect the sql to still execute, or failing that, a clearer error message about the bind parameter limit.

cfogrady avatar Sep 16 '22 16:09 cfogrady

Well the limit is 65_536 so it does look like some postgres limitation and the message you see is the actual backend message. I don't think that we should validate parameter count it on our side and I don't think we should replace backend message either.

Squiry avatar Sep 18 '22 18:09 Squiry

Thank you for the reply... I couldn't find a documented limit on the postgres side, but that makes sense. Should have realized it would be 16-bit max int based on the range. I might argue that if this is a known limitation that it be a driver side check instead of the driver assuming infinite binds only to have the backend unable to parse the message because of it. But could also see the argument of leaving as is... My argument for detecting it driver side is that it seems unlikely the backend can detect why it's unable to parse a message to produce a better message in this case... but I don't think I would argue that too strongly... Thanks again for the info!

cfogrady avatar Sep 18 '22 20:09 cfogrady

I couldn't find a documented limit on the postgres side

Yeah, I tried to find something too but I've found only some stackoverflow posts. So I've just tried some numbers near 65536 to be sure.

I might argue that if this is a known limitation that it be a driver side check instead of the driver assuming infinite binds only to have the backend unable to parse the message because of it.

Well you kind of right. Protocol expect parameter length as a short (I've missed that part in first round) so we do can check it somewhere near io.r2dbc.postgresql.PostgresqlStatement constructor since we parse sql now (I don't like that fact btw) and we can throw some exception even without sending the query to backend.

By the way: you can use arrays instead of IN: it's just one parameter and the sql is the same for every array size.

Squiry avatar Sep 18 '22 20:09 Squiry

Thanks, I didn't realize that was a possibility. Will definitely look into it for my use case.

cfogrady avatar Sep 18 '22 20:09 cfogrady

Yeah, spring with NamedJdbcTemplate kind of spoiled us all with that ? joining for java List.

Squiry avatar Sep 18 '22 21:09 Squiry

Thanks for the discussion. As this is state is controlled entirely from the calling side, we cannot do anything about it so closing the ticket.

mp911de avatar Oct 31 '22 13:10 mp911de