Stream-Framework icon indicating copy to clipboard operation
Stream-Framework copied to clipboard

OperationTimedOut in Celery

Open TvoroG opened this issue 11 years ago • 6 comments

Hello!

When I try to follow user insert queries return the following error:

ERROR 2014-10-07 13:21:24,547 log 5092 -1216981312 Task stream_framework.tasks.fanout_operation_hi_priority[3312a605-9d8a-4ee7-9385-929f6d881b68] raised unexpected: OperationTimedOut('errors=errors=errors={}, last_host=localhost, last_host=None, last_host=None',)
Traceback (most recent call last):
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/tasks.py", line 17, in fanout_operation_hi_priority
    return fanout_operation(feed_manager, feed_class, user_ids, operation, operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/local.py", line 167, in <lambda>
    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return orig(self, *args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__
    return self.run(*args, **kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/tasks.py", line 11, in fanout_operation
    feed_manager.fanout(user_ids, feed_class, operation, operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/feed_managers/base.py", line 346, in fanout
    operation(feed, **operation_kwargs)
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/query.py", line 197, in __exit__
    self.execute()
  File "/vagrant/.env/lib/python2.7/site-packages/stream_framework/storage/cassandra/timeline_storage.py", line 30, in execute
    super(Batch, self).execute()
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/query.py", line 185, in execute
    tmp = execute('\n'.join(query_list), parameters, self._consistency)
  File "/vagrant/.env/lib/python2.7/site-packages/cqlengine/connection.py", line 112, in execute
    result = session.execute(query, params)
  File "/vagrant/.env/lib/python2.7/site-packages/cassandra/cluster.py", line 1294, in execute
    result = future.result(timeout)
  File "/vagrant/.env/lib/python2.7/site-packages/cassandra/cluster.py", line 2790, in result
    raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors=errors=errors={}, last_host=localhost, last_host=None, last_host=None

I found this solution: https://github.com/cqlengine/cqlengine/issues/237 but it's crude. How you deal with this problem?

TvoroG avatar Oct 30 '14 11:10 TvoroG

unfortunately the solution in the link is pretty much the best way to deal with python-driver long-lived session and celery pre-fork. How often do you get the timeout? did you already check cassandra logs?

tbarbugli avatar Oct 30 '14 11:10 tbarbugli

Almost every attempt to follow. I didn't find anything useful in cassandra logs. Thanks for help! I just wanted to know whether there is a better solution or not.

TvoroG avatar Oct 30 '14 12:10 TvoroG

I think that I'm getting this error -- unfortunately the original link to the cqlengine issue is now dead ( https://github.com/cqlengine/cqlengine/issues/237 ) -- does anybody have any memory as to the workaround in that link?

picturedots avatar Oct 17 '16 19:10 picturedots

something like this:

import threading
from django.conf import settings
from cassandra.cluster import Cluster
from celery.signals import worker_process_init,worker_process_shutdown

thread_local = threading.local()

@worker_process_init.connect
def open_cassandra_session(*args, **kwargs):
    cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3)
    session = cluster.connect(settings.DATABASES["cassandra"]["NAME"])
    thread_local.cassandra_session = session

@worker_process_shutdown.connect
def close_cassandra_session(*args,**kwargs):
    session = thread_local.cassandra_session
    session.shutdown()
    thread_local.cassandra_session = None

We should probably add this to the documentation of the library and hook it up own cassandra connection setup() functions.

tbarbugli avatar Oct 18 '16 07:10 tbarbugli

A blog post with a similar issue and solution can be found here: http://robertolopes.me/2015/07/operationtimedout-exception-raised-when-using-the-cassandra-driver-for-python-in-conjunction-with-celery-delayed-tasks/ And sometimes tests are failing for us because of this: https://travis-ci.org/JelteF/Stream-Framework/jobs/168539883

JelteF avatar Oct 18 '16 08:10 JelteF

I think we should add this to the docs and perhaps wrap it up somehow with an helper function. If you use stream-framework with Cassandra you will have this problem for sure.

tbarbugli avatar Oct 18 '16 08:10 tbarbugli