PyAirbyte icon indicating copy to clipboard operation
PyAirbyte copied to clipboard

BUG: can't adapt type 'dict' when using postgresql as cache

Open jiangsong216 opened this issue 1 year ago • 4 comments

I'm using pyairbyte to read data from Braintree. Everything is fine when i use the default cache, but a can't adapt type 'dict' error raised when I changed the cache to postgresql. The error message is shown below:

Failed `source-braintree` read operation at 14:15:02.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.ProgrammingError: can't adapt type 'dict'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airbyte/sources/base.py", line 724, in read
    cache.processor.process_airbyte_messages(
  File "/usr/local/lib/python3.9/site-packages/airbyte/_processors/base.py", line 200, in process_airbyte_messages
    self.write_all_stream_data(
  File "/usr/local/lib/python3.9/site-packages/airbyte/_processors/base.py", line 211, in write_all_stream_data
    self.write_stream_data(stream_name, write_strategy=write_strategy)
  File "/usr/local/lib/python3.9/site-packages/airbyte/_processors/sql/base.py", line 555, in write_stream_data
    temp_table_name = self._write_files_to_new_table(
  File "/usr/local/lib/python3.9/site-packages/airbyte/_processors/sql/base.py", line 680, in _write_files_to_new_table
    dataframe.to_sql(
  File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pandas/core/generic.py", line 3008, in to_sql
    return sql.to_sql(
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 788, in to_sql
    return pandas_sql.to_sql(
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 1958, in to_sql
    total_inserted = sql_engine.insert_records(
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 1507, in insert_records
    raise err
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 1498, in insert_records
    return table.insert(chunksize=chunksize, method=method)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 1059, in insert
    num_inserted = exec_insert(conn, keys, chunk_iter)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/sql.py", line 951, in _execute_insert
    result = conn.execute(self.table.insert(), data)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1385, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
[SQL: INSERT INTO airbyte_raw.merchant_account_stream_01hwdc4tpmaayvv57g36s8ykay (business_details, currency_iso_code, funding_details, id, individual_details, status, _airbyte_extracted_at, _airbyte_meta, _airbyte_raw_id) VALUES (%(business_details)s,
%(currency_iso_code)s, %(funding_details)s, %(id)s, %(individual_details)s, %(status)s, %(_airbyte_extracted_at)s, %(_airbyte_meta)s, %(_airbyte_raw_id)s)]
[parameters: {'business_details': {'address_details': {}}, 'currency_iso_code': 'USD', 'funding_details': {}, 'id': 'ubix', 'individual_details': {'address_details': {}}, 'status': 'active', '_airbyte_extracted_at': datetime.datetime(2024, 4, 26, 14, 15, 0, 51000,
tzinfo=datetime.timezone.utc), '_airbyte_meta': '{}', '_airbyte_raw_id': '01HWDC4TPMNBHN1P84A83847QD'}]
(Background on this error at: https://sqlalche.me/e/14/f405)

jiangsong216 avatar Apr 26 '24 14:04 jiangsong216

@jiangsong216 - Thanks for reporting this. I've looked into this and have not yet found a potential root cause. I'll continue to look into it.

Can you share the PyAirbyte version you are using?

aaronsteers avatar Apr 27 '24 03:04 aaronsteers

version 0.10.4, thanks a lot

jiangsong216 avatar Apr 29 '24 06:04 jiangsong216

@jiangsong216 - Have not yet been able to locate the root cause here. Thank you for your patience - and let us know if you find any additional clues.

If it is any help, I see this issue is specifically with the stream merchant_account_stream. You might be able to exclude this stream in your selection criteria.

You could directly get records from Source.get_records('merchant_account_stream') for this stream alone - but this bypasses the cache so you'd need to serialize the records yourself until we have a real fix.

aaronsteers avatar May 07 '24 21:05 aaronsteers

Hi @aaronsteers @bindipankhudi How is the progress with this issue? I encounted this problem again with Jira source, seems that it's a common issue

jiangsong216 avatar Jun 28 '24 04:06 jiangsong216