asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

set_type_codec encoder to handle numpy/pandas null types not working with copy_records_to_table

Open wvolkov opened this issue 3 years ago • 3 comments

  • asyncpg version:0.21.0
  • PostgreSQL version: PostgreSQL 13.1 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?:
  • Python version: 3.7
  • Platform: Ubuntu 18.04
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?: -
  • Can the issue be reproduced under both asyncio and uvloop?: -

I am trying to set up an asyncpg in order to use copy_records_to_table method passing pandas data frame, converted to list of lists.

My table is simple:

create table sg.test("no" int4, dt timestmap)

As pandas/numpy have their own Null types for different types (pd.NaT for dates and np.nan for numerics) I tried to set up an appropriate encoder:

import asyncio
import asyncpg
import pandas as pd
import numpy as np


data = [[1, np.datetime64('NaT')]]
df = pd.DataFrame(data=data, columns=["no", "dt"])


def encoder(val):
    if isinstance(val, (type(pd.NaT), type(np.nan))):
        return None
    return bytes(val)


def decoder(val): # we do not need that for this task
    return val


async def main(df: pd.DataFrame):

    conn: asyncpg.Connection=await asyncpg.connect('postgresql://connection_string')

    try:
        await conn.set_type_codec(
            'timestamp',
            encoder=encoder,
            decoder=decoder,
            schema='pg_catalog',
            format='binary'
        )
        res=await conn.copy_records_to_table('test', records=df.values.tolist(), columns=df.columns.values.tolist(), schema_name='sg')
        print(res)
    except ValueError as e:
        print(e.args)
    finally:
        await conn.close()

asyncio.get_event_loop().run_until_complete(main(df))

I am getting following error:

....
  File "asyncpg/protocol/protocol.pyx", line 482, in copy_in
  File "asyncpg/protocol/protocol.pyx", line 429, in asyncpg.protocol.protocol.BaseProtocol.copy_in
  File "asyncpg/protocol/codecs/base.pyx", line 192, in asyncpg.protocol.protocol.Codec.encode
  File "asyncpg/protocol/codecs/base.pyx", line 178, in asyncpg.protocol.protocol.Codec.encode_in_python
  File "asyncpg/pgproto/./codecs/bytea.pyx", line 19, in asyncpg.pgproto.pgproto.bytea_encode
TypeError: a bytes-like object is required, not 'NoneType'

For me it is unclear what kind of bytes should encoder return in case of Null value, I have tried to investigate it backwards: select a null value from original table with custom decoder, but it seems that asyncpg just amends that method in case of null values at all. So it is still unclear what kind of data should be passed.

I have already tried to return following values at encoder:

  • b''
  • b'\x01' another error occurs:
insufficient data left in message

wvolkov avatar Jan 20 '21 08:01 wvolkov

One may add here, that if we remove custom type codec and change copy records a bit:

res=await conn.copy_records_to_table('test', records=[[1, None], columns=df.columns.values.tolist(), schema_name='sg')

This works just fine, but that requires a preprocessing of pandas data frame values in order to replace every np.nan and pd.NaT with None one, which leads to additional computations

wvolkov avatar Jan 20 '21 08:01 wvolkov

Well, this is because custom codecs weren't prepared to be filters, i.e. a non-NULL input value should produce a non-NULL encoding, but this is something we could allow, I suppose.

That said, a custom codec would not necessarily be much faster than a filtering generator wrapped around a dataframe. Have you measured the difference?

elprans avatar Mar 08 '21 23:03 elprans

My investigation showed that In order to replace np.nan and pd.NaT types to None one, I should cast all columns of a DataFrame to an object first, and then to replace it with where clause like this:

df = df.astype(object).where(pd.notnull(df), None)

This operation takes ~ 2 sec for a slice of 10k records with ~ 200 columns. I am building a migration app that copies large table from one server to another with "multithreading", so each worker will take these additional 2 seconds to perform an insertion with copy_records_to_table.

wvolkov avatar Mar 21 '21 08:03 wvolkov