behave-django icon indicating copy to clipboard operation
behave-django copied to clipboard

How to run Channels 2 worker?

Open d-s-e opened this issue 7 years ago • 3 comments

Hi, my tests worked fine so far with Channels 1, using the manual integration of behave. Now I'm trying the upgrade to Channels 2, and I don't know how to start the workers anymore. Until now I started some workers as Threads in before_all, but with Channels 2 I only get the following error, when I'm trying to do that:

RuntimeError: There is no current event loop in thread 'Worker'

What can I do, any hints?

d-s-e avatar Jul 04 '18 09:07 d-s-e

@proofit404 helped to deal with the original issue #16 for Channels 1. He, and of course @andrewgodwin, may help us shed light into your problem. -- Artem? Andrew? ping! :bell:

In the meantime, can you please provide specific code samples of your setup to make us understand the specifics of your situation?

bittner avatar Jul 05 '18 08:07 bittner

This is the relevant part of the environment.py, that was working with Channels 1:

from channels.worker import Worker

WORKER_COUNT = 2


class WorkerThread(threading.Thread):
    def __init__(self, channel_layer):
        super().__init__(name="Worker")
        self.channel_layer = channel_layer

    def run(self):
        worker = Worker(channel_layer=self.channel_layer, signal_handlers=False)
        worker.ready()
        worker.run()


def before_all(context):
    for _ in range(WORKER_COUNT):
        worker = WorkerThread(context.channel_layer)
        worker.daemon = True
        worker.start()

I tried to get it working with Channels 2 - this is the current code, that throws the error:

from channels.worker import Worker
from channels.routing import get_default_application

WORKERS = [
    ["trigger", "ctrl", "device", "script"],
    ["trigger", "ctrl", "device", "script"],
]


class WorkerThread(threading.Thread):
    def __init__(self, channel_layer, channels):
        super().__init__(name="Worker")
        self.channel_layer = channel_layer
        self.channels = channels

    def run(self):
        worker = Worker(get_default_application(), self.channels, self.channel_layer)
        worker.run()


def before_all(context):
    for worker_channels in WORKERS:
        worker = WorkerThread(context.channel_layer, worker_channels)
        worker.daemon = True
        worker.start()

d-s-e avatar Jul 05 '18 12:07 d-s-e

I made some progress, the workers seem to run now with that code:

class WorkerThread(threading.Thread):
    def __init__(self, channel_layer, channels):
        super().__init__(name="Worker")
        self.channel_layer = channel_layer
        self.channels = channels

    def run(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        logger.info(loop)
        worker = Worker(get_default_application(), self.channels, self.channel_layer)
        loop.run_until_complete(worker.run())
        loop.close()

I don't know much about that asyncio stuff, so maybe it's not a good solution. I also now have the problem, that some channel messages get lost and aren't handled, and I get occassional errors like "Two event loops are trying to receive() on one channel layer at once!".

d-s-e avatar Jul 09 '18 15:07 d-s-e