connectors
connectors copied to clipboard
Postgres connector: temporary file size exceeds temp_file_limit
Bug Description
The postgres connector fails to sync after processing a few documents in my database.
To Reproduce
Traceback (most recent call last):
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 482, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncpg.exceptions.ConfigurationLimitExceededError: temporary file size exceeds temp_file_limit (1021877kB)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1964, in _exec_single_context
self.dialect.do_execute(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 747, in do_execute
cursor.execute(statement, parameters)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in execute
self._adapt_connection.await_(
File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 102, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 160, in greenlet_spawn
value = await result
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 494, in _prepare_and_execute
self._handle_exception(error)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 444, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 717, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.Error: <class 'asyncpg.exceptions.ConfigurationLimitExceededError'>: temporary file size exceeds temp_file_limit (1021877kB)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/connectors/es/sink.py", line 407, in run
await self.get_docs(generator)
File "/app/connectors/es/sink.py", line 458, in get_docs
async for count, doc in aenumerate(generator):
File "/app/connectors/utils.py", line 818, in aenumerate
async for elem in asequence:
File "/app/connectors/logger.py", line 247, in __anext__
return await self.gen.__anext__()
File "/app/connectors/es/sink.py", line 441, in _decorate_with_metrics_span
async for doc in generator:
File "/app/connectors/sync_job_runner.py", line 411, in prepare_docs
async for doc, lazy_download, operation in self.generator():
File "/app/connectors/sync_job_runner.py", line 447, in generator
async for doc, lazy_download in self.data_provider.get_docs(
File "/app/connectors/sources/postgresql.py", line 677, in get_docs
async for row in self.fetch_documents_from_table(
File "/app/connectors/sources/postgresql.py", line 525, in fetch_documents_from_table
async for doc in docs_generator:
File "/app/connectors/sources/postgresql.py", line 600, in _yield_all_docs_from_tables
async for row in self.yield_rows_for_query(
File "/app/connectors/sources/postgresql.py", line 644, in yield_rows_for_query
async for row in streamer:
File "/app/connectors/sources/postgresql.py", line 309, in data_streamer
async for data in fetch(
File "/app/connectors/sources/generic_database.py", line 110, in fetch
async for result in _execute():
File "/app/connectors/utils.py", line 556, in wrapped
raise e
File "/app/connectors/utils.py", line 551, in wrapped
async for item in func(*args, **kwargs):
File "/app/connectors/sources/generic_database.py", line 90, in _execute
cursor = await cursor_func()
File "/app/connectors/sources/postgresql.py", line 201, in get_cursor
cursor = await connection.execute(text(query))
File "/app/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 594, in execute
result = await greenlet_spawn(
File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 165, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1414, in execute
return meth(
File "/app/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 489, in _execute_on_connection
return connection._execute_clauseelement(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1638, in _execute_clauseelement
ret = self._execute_context(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1842, in _execute_context
return self._exec_single_context(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1983, in _exec_single_context
self._handle_dbapi_exception(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2325, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1964, in _exec_single_context
self.dialect.do_execute(
File "/app/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 747, in do_execute
cursor.execute(statement, parameters)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in execute
self._adapt_connection.await_(
File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 102, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 160, in greenlet_spawn
value = await result
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 494, in _prepare_and_execute
self._handle_exception(error)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 444, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 717, in _handle_exception
raise translated_error from error
sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.ConfigurationLimitExceededError'>: temporary file size exceeds temp_file_limit (1021877kB)
[SQL: SELECT * FROM "public"."document_snippets" ORDER BY "id","source_document_id" LIMIT 1000 OFFSET 2000]
Expected behavior
It should not fail. If an individual row cannot be uploaded, it should drop it and keep going.
Environment
Running the connector on docker
I believe this is actually a symptom of incorrectly ordering the primary keys.
This table has a compound primary key:
CREATE TABLE document_snippets(
source_document_id text,
id text,
PRIMARY KEY (source_document_id, id)
)
But in the error, the components are reversed ORDER BY id, source_document_id which will lead to a very expensive query.
The primary keys are sorted here, making the order inefficient.
Hi @Dig-Doug, have you tested if removing sorting makes it work for you?
Yes, removing the sort allows the connector to start syncing.
For reference, here is a comparison of the costs for misordering the keys:
EXPLAIN SELECT * FROM document_snippets ORDER BY id, source_document_id LIMIT 1000 OFFSET 2000;
Limit (cost=2440052.09..2440168.76 rows=1000 width=1405)
-> Gather Merge (cost=2439818.74..2679752.32 rows=2056430 width=1405)
Workers Planned: 2
-> Sort (cost=2438818.72..2441389.25 rows=1028215 width=1405)
Sort Key: id, source_document_id
-> Parallel Seq Scan on document_snippets (cost=0.00..448904.15 rows=1028215 width=1405)
EXPLAIN SELECT * FROM document_snippets ORDER BY source_document_id,id LIMIT 1000 OFFSET 2000;
Limit (cost=5294.09..7940.85 rows=1000 width=1405)
-> Index Scan using document_snippets_pkey on document_snippets (cost=0.56..6531468.94 rows=2467717 width=1405)
If you want you can make a contribution to us with the change you've made. If not, we'll address it but probably a bit later.
Would you like to contribute?