galaxy icon indicating copy to clipboard operation
galaxy copied to clipboard

Galaxy loses track of Pulsar jobs

Open natefoo opened this issue 1 year ago • 6 comments

This is a semi-common occurrence to the point where I have a script I run against Pulsar endpoints that does the following:

  1. Check for non-terminal jobs in the DB
  2. Check whether Pulsar knows about those jobs (jobid file exists in its active-jobs/preprocessing-jobs dirs)
  3. If jobs exist in 1 but not 2, remove the job dir on Pulsar and reset the job state to new for reexecution

This is an unfortunate resolution because in most cases the job finished properly on the Pulsar side, Galaxy just failed to update the state, and so you are rerunning the entire execution when really you just want to rerun (at most) the postprocessing or possibly just the final status update.

There may be many causes but I've finally got at least one concrete example. In this case I've had multiple jobs fail over different time periods with a database timeout in the AMQP consumer thread. I am not sure what causes this timeout - nothing incriminating in DB server logs/stats or Galaxy host logs/stats other than increased load and network consumption on the Galaxy server during this period - but nothing drastic (load at 3 instead of 1, network at 100 MB/s instead of 20 MB/s).

The message is received and ack'd:

pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,633 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Acknowledging UUID af159766-e58f-11ee-a906-fa163ed650e8 on queue status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,633 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Begin publishing to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,635 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Have producer for publishing to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,635 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Published to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.manager DEBUG 2024-03-18 20:26:17,636 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Handling asynchronous status update from remote Pulsar.

The AMQP heartbeat thread dies 6 minutes later:

pulsar.client.amqp_exchange ERROR 2024-03-18 20:36:18,112 [pN:main_vgp_handler0,p:3997641,tN:consume-heartbeat-pulsar_vgp_jetstream2__status_update] Problem with heartbeat, leaving heartbeat method in problematic state!
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/pulsar/client/amqp_exchange.py", line 208, in heartbeat
    connection.heartbeat_check()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/connection.py", line 328, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 222, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/amqp/connection.py", line 776, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed
Exception in thread consume-heartbeat-pulsar_vgp_jetstream2__status_update:
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/deps/_conda/envs/[email protected]/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 72, in run
    reraise(*_capture_exception())
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/_compat.py", line 115, in reraise
    raise value
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 70, in run
    return old_run_func(self, *a, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/deps/_conda/envs/[email protected]/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/pulsar/client/amqp_exchange.py", line 208, in heartbeat
    connection.heartbeat_check()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/connection.py", line 328, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 222, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/amqp/connection.py", line 776, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

This is followed another 6 minutes later by the DB timeout:

amqp.exceptions.ConnectionForced: Too many heartbeats missed
galaxy.jobs.runners.pulsar ERROR 2024-03-18 20:42:10,040 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Failed to update Pulsar job status for job_id (56400785/56400785)
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: could not receive data from server: Connection timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/galaxy/lib/galaxy/jobs/runners/pulsar.py", line 984, in __async_update
    job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/galaxy/lib/galaxy/jobs/handler.py", line 289, in job_pair_for_id
    job = self.sa_session.query(model.Job).get(id)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in get
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 468, in warned
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 947, in get
    return self._get_impl(ident, loading.load_on_pk_identity)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 951, in _get_impl
    return self.session._get_impl(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl
    return db_load_fn(
           ^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not receive data from server: Connection timed out
[SQL: SELECT job.id AS job_id, job.create_time AS job_create_time, job.update_time AS job_update_time, job.history_id AS job_history_id, job.library_folder_id AS job_library_folder_id, job.tool_id AS job_tool_id, job.tool_version AS job_tool_version, job.galaxy_version AS job_galaxy_version, job.dynamic_tool_id AS job_dynamic_tool_id, job.state AS job_s>
FROM history_dataset_collection_association, job_to_output_dataset_collection
WHERE job.id = job_to_output_dataset_collection.job_id AND history_dataset_collection_association.id = job_to_output_dataset_collection.dataset_collection_id AND history_dataset_collection_association.deleted = true) AS anon_1, EXISTS (SELECT history_dataset_association.id
FROM history_dataset_association, job_to_output_dataset
WHERE job.id = job_to_output_dataset.job_id AND history_dataset_association.id = job_to_output_dataset.dataset_id AND history_dataset_association.deleted = true) AS anon_2
FROM job
WHERE job.id = %(pk_1)s]
[parameters: {'pk_1': '56400785'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
pulsar.client.manager ERROR 2024-03-18 20:42:10,102 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Failure processing job status update message.

Obviously ideally I'll fix whatever is causing the timeout, but also it'd be great if we didn't lose track of the one-and-only terminal status update. Two thoughts:

  1. Originally we (I?) chose to ack the status update before running the callback since the callback could block the consumer thread for an extended amount of time, but this is maybe against the spirit of the ack in the first place, since it has not actually "handled" the message at that point other than to decode it.
  2. #9911 would be a solution to this.

natefoo avatar Mar 19 '24 12:03 natefoo

The AMQP heartbeat is now configurable (https://github.com/galaxyproject/pulsar/pull/357) although that was not the cause of the error nor was it likely to cause the job loss even if it was.

Setting tcp_keepalives_idle on the DB server (https://github.com/galaxyproject/infrastructure-playbook/commit/c782c589557e95a51b2c7103e67e2dea3fc89f1f) appears to have fixed the timeout issue, which has maybe been caused by an intermediate firewall closing the connection, since the VGP instance has job handler threads with potentially long idle periods.

natefoo avatar Apr 01 '24 14:04 natefoo

This is good news. Together with the new database server, more efficient job queries and generally decreased database load I would think this shouldn't happen again. I'm not sure if it is worth implementing an in-memory queue on the pulsar job handler side that would let us re-try changing the job state once the database has recovered from errors. Have you seen any other cases of lost jobs (or lost status updates) recently ?

mvdbeek avatar Apr 07 '24 12:04 mvdbeek

Not on the VGP instance, but I do still get them on the Main instances. I need to fix log storage on the handler VMs to see what's going on though and haven't had a chance to do that yet.

natefoo avatar Apr 09 '24 18:04 natefoo

Hi, are there any news about this issue ? I'm getting similar problem with usegalaxy.eu when sending test jobs to my pulsar instance. Some of the jobs are getting stuck in running state although pulsar claims that status update message 'complete' was published and the output file contents are OK. Galaxy version at usegalaxy.eu is 24.0.3.dev0 and my pulsar version is 0.15.6.

astalosj avatar May 21 '24 05:05 astalosj