dagster
dagster copied to clipboard
Dagster uses an implausible number of postgresql connections
Dagster version
1.6.3
What's the issue?
Hello,
We've encountered a significant issue with Dagster's interaction with PostgreSQL, leading to an exhaustion of the available connection slots. Despite increasing the connection limit from 100 to 1000, we are still facing the following error:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "X.X.X.X", port 5432 failed: FATAL: remaining connection slots are reserved for non-replication superuser connections
This problem seems to arise when running Dagster jobs with multiple steps. However, the number of steps executed is not sufficient to justify the use of 1000 connections, suggesting an issue with how Dagster, specifically the dagster/concurrency_key
, manages database connections. My hypothesis is that there might be a problem in how connections are created or reused, which becomes apparent under the load of multi-step jobs.
Can someone please look into this? Any guidance on diagnosing or resolving this issue would be greatly appreciated.
Thank you for your time.
What did you expect to happen?
No response
How to reproduce?
No response
Deployment type
None
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
Are you able to query pg_stat_activity
when this is happening? That may help point to a specific problematic interaction if there is a clear pattern.
Investigating https://github.com/dagster-io/dagster/pull/20367 from skimming the code
@alangenfeld I will try next time it happens but it's not totally clear when it will be. I can see spikes in active connections for the last 30 days in my logs which is good because I will be able to confirm if your fix works.
Experienced the same issue on 1.7.3. Tried to start using the concurrency limit across runs via dagster/concurrency_key
but the runs started crashing with errors
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "hostname" (ip), port 5432 failed: FATAL: too many connections for role "dagster"
It happened again in 1.7.1 so the fix wasn't effective. @alangenfeld what do you want from pg_stat_activity
?
what do you want from pg_stat_activity?
query
would probably be the most useful, but fine with *
@alangenfeld Sorry, I should have looked into what pg_stat_activity
is before. This won't work in our scenario. This errors a bit unpredictable and depend on the load we have on our Dagster production instance, by the time I log into that instance the activity will likely be gone already. For historical analysis, I can setup Audit Logs would that be helpful?
@alangenfeld this happens quite often nowadays, I guess because we have more concurrency. I will be more than happy to provide help if you let me know what would be helpful to fix this issue. 1000 connections for 12 concurrent runs is definitely a bug, it's also adding strain on our MySQL instance because it has to manage so many connections. I had to double our vCPU count to make Dagit usable but still hitting 100% CPU usage consistently.
import time
from dagster import (
DynamicOut,
DynamicOutput,
graph_asset,
op,
)
@op(out=DynamicOut())
def create_range(context):
for i in range(100):
yield DynamicOutput(mapping_key=str(i), value=str(i))
@op(
tags={"dagster/concurrency_key": "serial"},
)
def sleep(context, value):
time.sleep(2)
return
@graph_asset(
group_name="test",
)
def break_dagster():
return create_range().map(sleep)
If you use this script and run like 5-10 of those concurrently with a Postgre that has the default 100 connections, you will see the issue. Each of those script open dozens of connections randomly.
Make sure to set the serial
concurrency_key to 1 in the UI beforehand
Facing the same issue, I'm running dagster 1.7.4 with the K8sRunLauncher
I tried a backfill with 6 concurrent runs. Each job's run has 7 steps. Steps were randomly failing withpsycopg2.OperationalError: connection to server at "XXX" (XXX), port 5432 failed: FATAL: remaining connection slots are reserved for non-replication superuser connections
(full error at the end of this message).
Having a look at the db. The following query is called almost 4000 times. Over the 15 mins that the backfill lasted. Which is pretty crazy.
INSERT INTO
event_logs (run_id,
event,
dagster_event_type,
timestamp,
step_key,
asset_key,
PARTITION)
VALUES
($1, $2, $3, $4::timestamp, $5, $6, $7) RETURNING event_logs.run_id,
event_logs.id
These were all the queries ran during the backfill where things failed:
psycopg2.OperationalError: connection to server at "XXX" (XXX), port 5432 failed: FATAL: remaining connection slots are reserved for non-replication superuser connections
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 145, in __init__
self._dbapi_connection = engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 3282, in raw_connection
return self.pool.connect()
^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 449, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/impl.py", line 282, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
self.__connect()
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
with util.safe_reraise():
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
raise exc_value.with_traceback(exc_tb)
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/create.py", line 643, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 616, in connect
return self.loaded_dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/miniconda/lib/python3.12/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@alangenfeld Is the info above useful for tracing down the problem?
The INSERT INTO event_logs...
statement correspond to the events emitted from a run. In addition to the structured events emitted by the framework, context.log
calls will also result in event inserts. These inserted events are what you see when you load the UI for a specific run. How many of those 7 steps are running concurrently in the 6 active runs will effect the total concurrent connections being used to store events.
The default multiprocess executor will default the number of concurrent subprocesses it allows according to the number of cpus reported by multiprocessing.cpu_count()
. Each subprocess makes its own connection to the database.
https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#controlling-job-execution for more details on controlling this concurrency explicitly.
@alangenfeld thanks for taking a look at this issue again, my problem may be different from @efcaguab since the environment where I'm experiencing this issue already has a max concurrency set to 12. Doesn't explain why 1000+ connections would be required. Weere you able to test the script I posted above?
Testing your script locally on master with postgres running in docker at 12 runs I am getting 10 - 15 concurrent connections.
I do see something unexpected in pg_stat_activity
(our check for if the concurrency_slots
table exists firing repeatedly) so I will send a fix out for that.
@alangenfeld exciting news, I will wait for that fix to land and will report back what happens
See old and still valid report on this issue: https://github.com/dagster-io/dagster/issues/8466
We have pgbouncer infront and that can handle the crazy amount of connections spammed, though the spam has not gone away. (This must be fixed in the cloud version to be honest? Maybe one can look there for inspiration.)
Can replicate as of dagster-webserver and dagster 1.7.8
We're also experiencing this issue. A single materialization limited to 16 concurrent runs generates well over 2k postgres connections. We're trying to get pgbouncer or RDS proxies in place to help address the issue.
We began seeing this issue when migrating our k8s jobs to use k8s pipes for asset materialization. Prior to migrating we would simply launch jobs and not tie into any asset functionality and had no issues scaling the number of jobs we ran. Upon migrating to pipes and utilizing assets we quickly ran into the postgres connection issue.
Are there any updates on this issue? We are also running into the same issue. We suddenly got 1000's of connections to the PostgresDB after deploying the DagsterDaemon. Our (temporary) solution was to shut down the Daemon deployment and use GraphQL to start jobs, however this means we can not use any sensors or schedules.