dagster icon indicating copy to clipboard operation
dagster copied to clipboard

Dagster uses an implausible number of postgresql connections

Open chrishiste opened this issue 11 months ago • 19 comments

Dagster version

1.6.3

What's the issue?

Hello,

We've encountered a significant issue with Dagster's interaction with PostgreSQL, leading to an exhaustion of the available connection slots. Despite increasing the connection limit from 100 to 1000, we are still facing the following error:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "X.X.X.X", port 5432 failed: FATAL: remaining connection slots are reserved for non-replication superuser connections

This problem seems to arise when running Dagster jobs with multiple steps. However, the number of steps executed is not sufficient to justify the use of 1000 connections, suggesting an issue with how Dagster, specifically the dagster/concurrency_key, manages database connections. My hypothesis is that there might be a problem in how connections are created or reused, which becomes apparent under the load of multi-step jobs.

Can someone please look into this? Any guidance on diagnosing or resolving this issue would be greatly appreciated.

Thank you for your time.

What did you expect to happen?

No response

How to reproduce?

No response

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

chrishiste avatar Mar 07 '24 12:03 chrishiste

Are you able to query pg_stat_activity when this is happening? That may help point to a specific problematic interaction if there is a clear pattern.

Investigating https://github.com/dagster-io/dagster/pull/20367 from skimming the code

alangenfeld avatar Mar 08 '24 17:03 alangenfeld

@alangenfeld I will try next time it happens but it's not totally clear when it will be. I can see spikes in active connections for the last 30 days in my logs which is good because I will be able to confirm if your fix works.

chrishiste avatar Mar 08 '24 18:03 chrishiste

Experienced the same issue on 1.7.3. Tried to start using the concurrency limit across runs via dagster/concurrency_key but the runs started crashing with errors

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "hostname" (ip), port 5432 failed: FATAL:  too many connections for role "dagster"

ignaski avatar May 06 '24 14:05 ignaski

It happened again in 1.7.1 so the fix wasn't effective. @alangenfeld what do you want from pg_stat_activity?

chrishiste avatar May 06 '24 15:05 chrishiste

what do you want from pg_stat_activity?

query would probably be the most useful, but fine with *

alangenfeld avatar May 06 '24 15:05 alangenfeld

@alangenfeld Sorry, I should have looked into what pg_stat_activity is before. This won't work in our scenario. This errors a bit unpredictable and depend on the load we have on our Dagster production instance, by the time I log into that instance the activity will likely be gone already. For historical analysis, I can setup Audit Logs would that be helpful?

chrishiste avatar May 06 '24 16:05 chrishiste

@alangenfeld this happens quite often nowadays, I guess because we have more concurrency. I will be more than happy to provide help if you let me know what would be helpful to fix this issue. 1000 connections for 12 concurrent runs is definitely a bug, it's also adding strain on our MySQL instance because it has to manage so many connections. I had to double our vCPU count to make Dagit usable but still hitting 100% CPU usage consistently.

chrishiste avatar May 09 '24 16:05 chrishiste

import time

from dagster import (
    DynamicOut,
    DynamicOutput,
    graph_asset,
    op,
)


@op(out=DynamicOut())
def create_range(context):
    for i in range(100):
        yield DynamicOutput(mapping_key=str(i), value=str(i))


@op(
    tags={"dagster/concurrency_key": "serial"},
)
def sleep(context, value):
    time.sleep(2)
    return


@graph_asset(
    group_name="test",
)
def break_dagster():
    return create_range().map(sleep)

If you use this script and run like 5-10 of those concurrently with a Postgre that has the default 100 connections, you will see the issue. Each of those script open dozens of connections randomly.

chrishiste avatar May 09 '24 16:05 chrishiste

Make sure to set the serial concurrency_key to 1 in the UI beforehand

chrishiste avatar May 09 '24 16:05 chrishiste

Facing the same issue, I'm running dagster 1.7.4 with the K8sRunLauncher

I tried a backfill with 6 concurrent runs. Each job's run has 7 steps. Steps were randomly failing withpsycopg2.OperationalError: connection to server at "XXX" (XXX), port 5432 failed: FATAL: remaining connection slots are reserved for non-replication superuser connections (full error at the end of this message).

Having a look at the db. The following query is called almost 4000 times. Over the 15 mins that the backfill lasted. Which is pretty crazy.

INSERT INTO
  event_logs (run_id,
    event,
    dagster_event_type,
    timestamp,
    step_key,
    asset_key,
    PARTITION)
VALUES
  ($1, $2, $3, $4::timestamp, $5, $6, $7) RETURNING event_logs.run_id,
  event_logs.id
image

These were all the queries ran during the backfill where things failed:

image
psycopg2.OperationalError: connection to server at "XXX" (XXX), port 5432 failed: FATAL:  remaining connection slots are reserved for non-replication superuser connections


  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 145, in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 3282, in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/impl.py", line 282, in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    self.__connect()
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
    with util.safe_reraise():
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/create.py", line 643, in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 616, in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/lib/python3.12/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

efcaguab avatar May 12 '24 23:05 efcaguab

@alangenfeld Is the info above useful for tracing down the problem?

efcaguab avatar May 21 '24 06:05 efcaguab

The INSERT INTO event_logs... statement correspond to the events emitted from a run. In addition to the structured events emitted by the framework, context.log calls will also result in event inserts. These inserted events are what you see when you load the UI for a specific run. How many of those 7 steps are running concurrently in the 6 active runs will effect the total concurrent connections being used to store events.

The default multiprocess executor will default the number of concurrent subprocesses it allows according to the number of cpus reported by multiprocessing.cpu_count(). Each subprocess makes its own connection to the database. https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#controlling-job-execution for more details on controlling this concurrency explicitly.

alangenfeld avatar May 24 '24 19:05 alangenfeld

@alangenfeld thanks for taking a look at this issue again, my problem may be different from @efcaguab since the environment where I'm experiencing this issue already has a max concurrency set to 12. Doesn't explain why 1000+ connections would be required. Weere you able to test the script I posted above?

chrishiste avatar May 24 '24 20:05 chrishiste

Testing your script locally on master with postgres running in docker at 12 runs I am getting 10 - 15 concurrent connections.

I do see something unexpected in pg_stat_activity (our check for if the concurrency_slots table exists firing repeatedly) so I will send a fix out for that.

alangenfeld avatar May 24 '24 20:05 alangenfeld

@alangenfeld exciting news, I will wait for that fix to land and will report back what happens

chrishiste avatar May 24 '24 21:05 chrishiste

See old and still valid report on this issue: https://github.com/dagster-io/dagster/issues/8466

We have pgbouncer infront and that can handle the crazy amount of connections spammed, though the spam has not gone away. (This must be fixed in the cloud version to be honest? Maybe one can look there for inspiration.)

Can replicate as of dagster-webserver and dagster 1.7.8

emirkmo avatar May 31 '24 16:05 emirkmo

We're also experiencing this issue. A single materialization limited to 16 concurrent runs generates well over 2k postgres connections. We're trying to get pgbouncer or RDS proxies in place to help address the issue.

We began seeing this issue when migrating our k8s jobs to use k8s pipes for asset materialization. Prior to migrating we would simply launch jobs and not tie into any asset functionality and had no issues scaling the number of jobs we ran. Upon migrating to pipes and utilizing assets we quickly ran into the postgres connection issue.

caleboverman avatar Jun 11 '24 17:06 caleboverman

Are there any updates on this issue? We are also running into the same issue. We suddenly got 1000's of connections to the PostgresDB after deploying the DagsterDaemon. Our (temporary) solution was to shut down the Daemon deployment and use GraphQL to start jobs, however this means we can not use any sensors or schedules.

leo-schmidt avatar Oct 04 '24 11:10 leo-schmidt