shardingsphere icon indicating copy to clipboard operation
shardingsphere copied to clipboard

ShardingSphere does not support the extended query protocol (binary)

Open houper888 opened this issue 9 months ago • 2 comments

When I used Npgsql to read data from ShardingSphere, I found that the Chinese content could never be completely read during the process. Image

After investigation, I learned that Npgsql uses the extended query protocol(binary). The problem is that ShardingSphere while writing text as binary encodes the length as the number of characters a string contains, ignoring the encoding client specified. As an example, string 这是一个测试字符串 has 9 characters (this is what npgsql will get as column's length), while in reality ShardingSphere should send us a 54 byte array (which is correct as that's 27 2-byte characters, 3 characters per a single Chinese character).

Image

The length returned here should be based on the encoding required by the client.

houper888 avatar Apr 02 '25 08:04 houper888

I'd like to ask if you will fix this bug in the next version? @RaigorJiang

houper888 avatar Apr 16 '25 02:04 houper888

Hi @houper888 I want to fix this problem, but I have no experience with Npgsql. Can you provide a demo project to reproduce the problem?

RaigorJiang avatar Apr 16 '25 07:04 RaigorJiang

https://www.postgresql.org/docs/current/protocol-message-formats.html search DataRow in this doc

ShenFeng312 avatar Aug 08 '25 06:08 ShenFeng312

Findings While Fixing the Bug

  1. Incorrect Read Methods
    Both PostgreSQLStringArrayBinaryProtocolValue and PostgreSQLStringBinaryProtocolValue and PostgreSQLTextArrayBinaryProtocolValue have incorrect read method implementations.
    This effectively means arrays have never been used on the JDBC side

right way

Image
  1. Binary Transfer Disabled by getMetaData
    In the PostgreSQL JDBC implementation, if getMetaData is called on a prepared statement before execution, binary transfer is forcibly disabled.
    Due to an issue in ShardingProxy’s prepared statement implementation, this leads to multiple serializations.

cc @RaigorJiang @terrymanu

ShenFeng312 avatar Aug 11 '25 02:08 ShenFeng312

Assign this issue to me, please.

ShenFeng312 avatar Aug 11 '25 02:08 ShenFeng312

Assign this issue to me, please.

@ShenFeng312 Thanks for your help.

RaigorJiang avatar Aug 11 '25 10:08 RaigorJiang

Hello everyone. I’ve already fixed the binary protocol issue and performed some basic testing. In addition, I’ve added support for special values of the numeric type such as NaN, as well as several commonly used array types. In fact, we can support all binary types that we currently handle, and arbitrary-dimensional arrays are also supported.

Of course, there are still some problems. For example, when using JDBC, the getObject method for the time type defaults to returning java.sql.Time, which has a precision mismatch with PostgreSQL. I think we can address this in the future.

However, I’m not sure how to perform integration testing within the project. From what I’ve seen, the existing integration test framework doesn’t seem to meet the current requirements. Locally, I wrote a simple Java class to connect to ShardingProxy for testing.

import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.protocol.util.codec.decoder.PgBinaryObj;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.protocol.util.codec.encoder.*;
import org.postgresql.core.Oid;
import org.postgresql.core.QueryExecutor;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.PgStatement;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.Arrays;
import java.util.TimeZone;

public class ClientTest {

    public static byte[] longToBytesLE(long value) {
        byte[] bytes = new byte[8];
        for (int i = 0; i < 8; i++) {
            bytes[i] = (byte) (value >>> ((7 - i) * 8));
        }
        return bytes;
    }

    public static void main(String[] args) throws IOException {


        String url = "jdbc:postgresql://localhost:3307/test";

        String username = "root";
        String password = "root";
        Short[][] param1 = new Short[2][4];
        param1[0][0] = 1;
        param1[0][1] = 2;
        param1[1][0] = 3;
        Integer[][] param2 = new Integer[2][4];
        param2[0][0] = 1;
        param2[0][1] = 2;
        param2[1][0] = 3;
        Long[][] param3 = new Long[2][4];
        param3[0][0] = 1L;
        param3[0][1] = 2L;
        param3[1][0] = 3L;
        Float[][] param4 = new Float[2][4];
        param4[0][0] = 1.1F;
        param4[0][1] = 2.2F;
        param4[1][0] = Float.NaN;
        param4[1][1] = Float.POSITIVE_INFINITY;
        param4[1][2] = Float.NEGATIVE_INFINITY;
        Double[][] param5 = new Double[2][4];
        param5[0][0] = 1.1;
        param5[0][1] = 2.2;
        param5[1][0] = Double.NaN;
        param5[1][1] = Double.POSITIVE_INFINITY;
        param5[1][2] = Double.NEGATIVE_INFINITY;
        Number[][] param6 = new Number[2][4];
        param6[0][0] = new BigDecimal("1.1");
        param6[0][1] = new BigDecimal("2.2");
        param6[1][0] = Double.NaN;
        param6[1][1] = Double.POSITIVE_INFINITY;
        param6[1][2] = Double.NEGATIVE_INFINITY;
        Boolean[][] param7 = new Boolean[2][4];
        param7[0][0] = true;
        param7[0][1] = false;
        String[][] param8 = new String[2][4];
        param8[0][0] = "aaa";
        param8[0][1] = "bbbb";
        String[][] param9 = new String[2][4];
        param9[0][0] = "aaa";
        param9[0][1] = "bbbb";
        byte[][][] param10 = new byte[2][4][];
        param10[0][0] = new byte[10];
        param10[0][1] = "aaa".getBytes(StandardCharsets.UTF_8);


       String query = "SELECT\n" +
                "    ?::int2[]       AS int2_array,       -- smallint\n" +
                "   ?::int4[] AS int4_array,   -- integer\n" +
                "    ?::int8[] AS int8_array, -- bigint\n" +
                "    ?::float4[] AS float4_array, -- real\n" +
                "    ?::float8[] AS float8_array, -- double precision\n" +
                "    ?::numeric[] AS numeric_array, -- numeric / decimal\n" +
                "    ?::bool[] AS bool_array,       -- boolean\n" +
                "    ?::text[] AS text_array,      -- text\n" +
                "   ?::varchar[] AS varchar_array, -- varchar\n" +
                "    ?::bytea[] AS bytea_array, -- bytea\n" +
                "    ?::date[] AS date_array,      -- date\n" +
                "   ?::time[] AS time_array,          -- time without time zone\n" +
                "    ?::timestamp[] AS timestamp_array, -- timestamp\n" +
                "    ?::bool AS boolv, -- bool\n" +
                "    ?::bytea AS byteav, -- bytea\n" +
                "    ?::date AS datev, -- date\n" +
                "    ?::float8  AS float8v, -- float8\n" +
                "    ?::float4 AS float4v, -- float4\n" +
                "    ?::int2 AS int2v, -- int2\n" +
                "    ?::int4 AS int4v, -- int4\n" +
                "    ?::int8 AS int8v, -- int8\n" +
                "    ?::numeric AS numericv, -- numeric\n" +
                "    ?::text AS textv, -- text\n" +
                "    ?::varchar AS varcharv, -- varchar\n" +
                "    ?::time AS timev, -- time\n" +
                "    ?::timestamp AS timestampv -- timestamp\n";

        int[] arrayOids = new int[]{Oid.INT2_ARRAY, Oid.INT4_ARRAY, Oid.INT8_ARRAY, Oid.FLOAT4_ARRAY, Oid.FLOAT8_ARRAY, Oid.NUMERIC_ARRAY, Oid.BOOL_ARRAY, Oid.TEXT_ARRAY, Oid.VARCHAR_ARRAY, Oid.BYTEA_ARRAY};
        String[] arrayOidNames = new String[]{"int2[]", "int4[]", "int8[]", "float4[]", "float8[]", "numeric[]", "bool[]", "text[]", "varchar[]", "bytea[]"};
        byte[][] sendResult = new byte[26][];
        Object[] params = new Object[]{param1, param2, param3, param4, param5, param6, param7, param8, param9, param10};
        try (Connection connection = DriverManager.getConnection(url, username, password)) {
            PgConnection pgConnection = (PgConnection) connection;
            QueryExecutor queryExecutor = pgConnection.getQueryExecutor();
            queryExecutor.addBinarySendOid(Oid.DATE);
            queryExecutor.addBinarySendOid(Oid.DATE_ARRAY);
            queryExecutor.addBinarySendOid(Oid.TIME);
            queryExecutor.addBinarySendOid(Oid.TIME_ARRAY);
            queryExecutor.addBinarySendOid(Oid.BOOL_ARRAY);
            queryExecutor.addBinarySendOid(Oid.TIMESTAMP);
            queryExecutor.addBinarySendOid(Oid.TIMESTAMP_ARRAY);
            queryExecutor.addBinarySendOid(Oid.NUMERIC_ARRAY);
            queryExecutor.addBinarySendOid(Oid.VARCHAR);
            queryExecutor.addBinarySendOid(Oid.TEXT);
            queryExecutor.addBinarySendOid(Oid.BOOL);


            queryExecutor.addBinaryReceiveOid(Oid.DATE);
            queryExecutor.addBinaryReceiveOid(Oid.DATE_ARRAY);
            queryExecutor.addBinaryReceiveOid(Oid.TIME);
            queryExecutor.addBinaryReceiveOid(Oid.TIME_ARRAY);
            queryExecutor.addBinaryReceiveOid(Oid.BOOL_ARRAY);
            queryExecutor.addBinaryReceiveOid(Oid.TIMESTAMP);
            queryExecutor.addBinaryReceiveOid(Oid.TIMESTAMP_ARRAY);
            queryExecutor.addBinaryReceiveOid(Oid.NUMERIC_ARRAY);
            queryExecutor.addBinaryReceiveOid(Oid.VARCHAR);
            queryExecutor.addBinaryReceiveOid(Oid.TEXT);
            queryExecutor.addBinaryReceiveOid(Oid.BOOL);


            PreparedStatement preparedStatement = connection.prepareStatement(query);
            PgStatement pgStatement = (PgStatement) preparedStatement;
            pgStatement.setPrepareThreshold(-1);


            for (int i = 0; i < arrayOids.length; i++) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ArrayEncoding.getArrayEncoder(params[i]).toBinaryRepresentation((Object[]) params[i], arrayOids[i], baos);
                byte[] byteArray = baos.toByteArray();
                PgBinaryObj pgBinaryObj = new PgBinaryObj(byteArray);
                pgBinaryObj.setType(arrayOidNames[i]);
                sendResult[i] = byteArray;
                preparedStatement.setObject(i + 1, pgBinaryObj);
            }

            Date[][] dates = new Date[2][4];
            //1970-01-01
            dates[0][0] = new Date(-TimeZone.getDefault().getRawOffset());
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ArrayEncoding.getArrayEncoder(dates).toBinaryRepresentation(dates, Oid.DATE_ARRAY, baos);
            byte[] byteArray = baos.toByteArray();
            PgBinaryObj dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("date[]");
            sendResult[10] = byteArray;

            preparedStatement.setObject(11, dateBinary);

            baos.reset();
            Time[][] times = new Time[2][4];
            //time0
            times[0][0] = new Time(-TimeZone.getDefault().getRawOffset());
            ArrayEncoding.getArrayEncoder(times).toBinaryRepresentation(times, Oid.TIME_ARRAY, baos);
            byteArray = baos.toByteArray();
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("time[]");
            sendResult[11] = byteArray;

            preparedStatement.setObject(12, dateBinary);

            baos.reset();
            Timestamp[][] timestamps = new Timestamp[2][4];
            //timestamps0 1970-01-01
            timestamps[0][0] = new Timestamp(-TimeZone.getDefault().getRawOffset());
            ArrayEncoding.getArrayEncoder(timestamps).toBinaryRepresentation(timestamps, Oid.TIMESTAMP_ARRAY, baos);
            byteArray = baos.toByteArray();
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("timestamp[]");
            sendResult[12] = byteArray;

            preparedStatement.setObject(13, dateBinary);

            baos.reset();
            BooleanArrayEncoder.INSTANCE.write(true,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("bool");
            sendResult[13] = byteArray;

            preparedStatement.setObject(14,dateBinary);


            byteArray = new byte[10];
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("bytea");
            sendResult[14] = byteArray;

            preparedStatement.setObject(15, dateBinary);

            baos.reset();
            DateArrayEncoder.INSTANCE.write(new Date(-TimeZone.getDefault().getRawOffset()),baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("date");
            sendResult[15] = byteArray;

            preparedStatement.setObject(16, dateBinary);

            baos.reset();
            Float8ArrayEncoder.INSTANCE.write(1.2,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("float8");
            sendResult[16] = byteArray;

            preparedStatement.setObject(17, dateBinary);

            baos.reset();
            Float4ArrayEncoder.INSTANCE.write(1.2f,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("float4");
            sendResult[17] = byteArray;

            preparedStatement.setObject(18, dateBinary);

            baos.reset();
            Int2ArrayEncoder.INSTANCE.write((short) 1,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("int2");
            sendResult[18] = byteArray;

            preparedStatement.setObject(19, dateBinary);

            baos.reset();
            Int4ArrayEncoder.INSTANCE.write( 1,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("int4");
            sendResult[19] = byteArray;

            preparedStatement.setObject(20, dateBinary);

            baos.reset();
            Int8ArrayEncoder.INSTANCE.write(1L,baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length);
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("int8");
            sendResult[20] = byteArray;

            preparedStatement.setObject(21, dateBinary);

            baos.reset();
            NumericArrayEncoder.INSTANCE.write(new BigDecimal("1.2"),baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("numeric");
            sendResult[21] = byteArray;

            preparedStatement.setObject(22, dateBinary);

            baos.reset();
            StringArrayEncoder.INSTANCE.write("testString",baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("text");
            sendResult[22] = byteArray;

            preparedStatement.setObject(23, dateBinary);

            baos.reset();
            StringArrayEncoder.INSTANCE.write("testVarchar",baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("varchar");
            sendResult[23] = byteArray;

            preparedStatement.setObject(24, dateBinary);

            baos.reset();
            TimeArrayEncoder.INSTANCE.write(new Time(-TimeZone.getDefault().getRawOffset()),baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("time");
            sendResult[24] = byteArray;

            preparedStatement.setObject(25, dateBinary);

            baos.reset();
            TimestampArrayEncoder.INSTANCE.write(new Timestamp(-TimeZone.getDefault().getRawOffset()),baos);
            byteArray = baos.toByteArray();
            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
            dateBinary = new PgBinaryObj(byteArray);
            dateBinary.setType("timestamp");
            sendResult[25] = byteArray;

            preparedStatement.setObject(26, dateBinary);

//
//            baos.reset();
//            Int2ArrayEncoder.INSTANCE.write((short) 1,baos);
//            byteArray = baos.toByteArray();
//            byteArray = Arrays.copyOfRange(byteArray, 4, byteArray.length );
//            dateBinary = new PgBinaryObj(byteArray);
//            dateBinary.setType("int2");
//            sendResult[18] = byteArray;



            preparedStatement.execute();


            ResultSet resultSet = preparedStatement.getResultSet();
            resultSet.next();
            Object value = resultSet.getObject(6);
//            value.toString();
            value = resultSet.getObject(11);
            value = resultSet.getObject(12);
            value = resultSet.getObject(13);
            value = resultSet.getObject(14);
            for (int col = 1; col <= resultSet.getMetaData().getColumnCount(); col++) {
                value = resultSet.getBytes(col);
                boolean equals = new String((byte[]) value, StandardCharsets.US_ASCII).equals(new String((byte[]) sendResult[col - 1], StandardCharsets.US_ASCII));
                System.out.println("col:"+col+ equals);
            }


        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

@RaigorJiang @terrymanu

ShenFeng312 avatar Aug 20 '25 05:08 ShenFeng312

I will submit a PR later.

ShenFeng312 avatar Aug 20 '25 05:08 ShenFeng312