Thespian icon indicating copy to clipboard operation
Thespian copied to clipboard

questions: how can I avoid main thread exits when run actors with multiprocQueueBase

Open carlos-lm opened this issue 2 years ago • 9 comments

Hello, thanks for this lib, it's great, right now I've a small system with some actors, everything works as expected when I use simpleSystemBase but now I want to run this really async...so I'd like to use "multiprocQueueBase"

basically I've a normal python script which start some central actor (Dispatcher)

 self.actor_dispatcher = ActorSystem().createActor(Dispatcher)

this central actor creates new actors based in the number of items passed in some message

  for ticker in message.tickers:
                actor_mode = "simpleSystemBase" if self.test else "multiprocQueueBase"
                fetcher = ActorSystem(actor_mode).createActor(DataFetcher)

these actors (DataFetcher) must be async because they do request to some API

is this arquitecture correct?...dispatcher must use "multiprocQueueBase or is ok start this using the default simpleSystemBase"?

how can I avoid that my main process (the script that create the dispatcher actor) end before my actors (actually this actor system must be running forever) ..use a while True in the main script is ok or are there a better approach?

also, the logs in the actors will be displayed in the running script?..right now (using a while True in the main script) I can't get logs

thank you so much!!!

carlos-lm avatar Sep 13 '21 19:09 carlos-lm

Hi @carlos-lm ,

Thanks for the appreciation: it's nice to know this library has been useful.

You will almost certainly want to use one of the multi-process bases; I would actually recommend the multiprocTCPBase (https://thespianpy.com/doc/using.html#hH-59ea5ca7-a167-4d51-bd1b-78119eab6df6) if you run into issues using the multiprocQueueBase. Each of the multiproc bases does start up a separate process to act as the admin, independent of the application that started them; the started actor system will persist until an ActorSystem(actor_mode).shutdown stops them. Note that the multiprocQueueBase does not support re-connection, so it cannot be shutdown in this manner due to limitations in Python Queue functionality (https://thespianpy.com/doc/using.html#hH-59ea5ca7-a167-4d51-bd1b-78119eab6df6); it can only be shutdown by the same process that started it, or by external process kills.

Because a multiproc system has multiple processes, logging is coordinated centrally and written to files as specified by the logDefs argument used when starting the ActorSystem() (see https://thespianpy.com/doc/using.html#hH-f71b7bfa-c57b-4716-a2c7-ad83a2ed3582 and https://thespianpy.com/doc/using.html#hH-ce55494c-dd7a-4258-a1e8-b090c3bbb1e6).

Hopefully this will help for your needs.

kquick avatar Sep 14 '21 18:09 kquick

thank you for the detailed answer @kquick

I was checking the hellogoodbye sample and seems that change between system is just pass "multiprocTCPBase" as argument to ActorSystem...

I did that

 actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
 fetcher = ActorSystem(actor_mode,logDefs=logcfg).createActor(DataFetcher)

but I'm getting this error

2021-09-14 20:22:06,672 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]
2021-09-14 20:22:11,675 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]

another question...I'm passing a complex object in my message...I've my doubts with this but for simpleSystemBase this is working perfectly...it could happen that for simple serialization works fine for simpleSystemBase but not in other modes?...

thank you

carlos-lm avatar Sep 15 '21 00:09 carlos-lm

That warning is described here: https://github.com/kquick/Thespian/issues/11

Thespian uses the Python pickle facility to pass objects. As long as your message can be pickled, Thespian can pass it, although your intuition is correct: the simpleSystemBase doesn't need to pass messages so it can get away with more permissive message types than any other base.

kquick avatar Sep 15 '21 04:09 kquick

@kquick right now my code works ok when I run with simpleSystemBase, but when I move to another system seems that the actors not work but I don't get any error or message...

this is my code, this actor create another actors and send the message

import logging

from bqbbdb.basedb import BaseDB
from thespian.actors import Actor, ActorSystem
from typing import List

from bqbbexchanges.exchange.actors.data_fetcher import DataFetcher
from bqbbexchanges.exchange.actors.message import Message, SetPropertiesMessage, GetHistoricalData, \
    ResponseDataFetch, RequestDataFetch
from bqbbexchanges.settings import Settings, settings
from bqbbexchanges.utils.has_key import has_key
from bqbbexchanges.utils.my_logging import logcfg
from bqbbexchanges.utils.types import OHLCVData


class Dispatcher(Actor):
    """Dispatcher
    this actor create new actors according to the tickers that needs to fetch, when the data_fetcher
    return a response, this handle the response, that is, save in the db, send new data to the data_fetcher,etc

    """
    db: BaseDB = None
    exchange = None
   # some code here....

    def receiveMessage(self, message: Message, sender):
        if isinstance(message, SetPropertiesMessage):
            logging.info("initializing dispatcher with required db and exchange")
            self.exchange = message.exchange
            self.timeframes_mapper = message.timeframes_mapper
            self.test = message.test
            # here I pass the exchange and DB handler to the actor in order that this can make request to get data and store it
            self.db = message.DB
            self.exchange_prefix = message.exchange_prefix
            # self.send(sender, f"Hello, World! {self.name}")
        elif isinstance(message, GetHistoricalData):
            logging.debug("start getting historical data")

            # THIS IS THE IMPORTANT CODE ==================================== <----------
            for ticker in message.tickers:
                actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
                fetcher = ActorSystem(actor_mode, logDefs=logcfg).createActor(DataFetcher)

                init_message = SetPropertiesMessage()
                init_message.exchange = self.exchange
                init_message.timeframes_mapper = self.timeframes_mapper

                self.send(fetcher, init_message)
                self.send(fetcher,
                          RequestDataFetch(ticker_and_date=ticker, wait_time=0))

        elif isinstance(message, ResponseDataFetch):
            self._handle_received_data(message, sender)
        else:
            logging.error("Dispatcher> UNKNOWN MESSAGE, returning properties")
            message = SetPropertiesMessage()
            message.DB = self.db
            message.exchange = self.exchange
            message.timeframes_mapper = self.timeframes_mapper
            message.stored_data = self.stored_data
            self.send(sender, message)

# dispatcher = ActorSystem().createActor(Dispatcher)
# print(ActorSystem().ask(dispatcher, 'hi', 1))

and this are the fetcher actors

import logging
import time
from thespian.actors import Actor
from typing import List

from bqbbexchanges.exchange.actors.message import Message, RequestDataFetch, SetPropertiesMessage, ResponseDataFetch
from bqbbexchanges.settings import settings
from bqbbexchanges.utils.types import OHLCVData


class DataFetcher(Actor):
    """
        This actor only handle data fetcher for historical data,
        if there is a problem retrieving the data returns to the caller an empty list
    """

    exchange = None
    timeframes_mapper = {"1M": "1M", "1h": "1h",
                         "1m": "1m"}  # override this method allows you use another symbols for timeframe

    def __init__(self):
        super().__init__()
        print("initializing datafetcher")

    def _format_ohlcv_data(self, tick: list) -> OHLCVData:
        return {
            "time": str(tick[0]),
            "open": tick[1],
            "high": tick[2],
            "low": tick[3],
            "close": tick[4],
            "volume": tick[5]
        }

    def _fetch_ohlcv(self, pair, unix_date: int) -> List[OHLCVData]:
        try:
            if self.exchange is None or self.timeframes_mapper is None:
                logging.error("not initialized properties, please initialize first")

            if not self.exchange.has["fetchOHLCV"]:
                error = "This exchange doesn't have fetchOHLCV method implemented"
                logging.error(error)
                raise NotImplementedError(error)

            logging.debug(f"fetching data {pair} {unix_date}")
            data = self.exchange.fetch_ohlcv(pair,
                                             self.timeframes_mapper["1m"],
                                             since=unix_date,
                                             limit=settings.ACTOR_MAX_CANDLES_TO_RETRIEVE_BY_CALL)
            formated_data = []

            if len(data) == 0 or data is None:
                return formated_data

            for tick in data:
                if len(tick) == 6:
                    ohlcv_tick = self._format_ohlcv_data(tick)
                    formated_data.append(ohlcv_tick)

            return formated_data
        except Exception as err:
            logging.error(f"ERROR IN ACTOR  DataFetcher> {err}")
            return []

    def receiveMessage(self, message: Message, sender):
        logging.debug(f"DataFetcher> Receive {message}")
        if isinstance(message, SetPropertiesMessage):
            logging.info("initializing exchange")
            self.exchange = message.exchange
        if isinstance(message, RequestDataFetch):
            logging.debug(f"fetcher: received to fetch: {message.ticker_and_date}")
            pair = message.ticker_and_date["pair"]
            time.sleep(message.wait_time or 0)
            formated_data = self._fetch_ohlcv(pair=pair,
                                              unix_date=message.ticker_and_date["init_date"])
            self.send(sender, ResponseDataFetch(pair=pair, data=formated_data, request=message))
from dataclasses import dataclass

from typing import List

from bqbbexchanges.utils.types import TickerAndDate, OHLCVData


@dataclass
class Message:
    pass


@dataclass
class SetPropertiesMessage(Message):
    DB = None
    exchange = None
    timeframes_mapper = None
    test = False
    stored_data = None
    exchange_prefix = None


@dataclass
class GetHistoricalData(Message):
    tickers: List[TickerAndDate]


@dataclass
class RequestDataFetch(Message):
    ticker_and_date: TickerAndDate
    wait_time: int


@dataclass
class ResponseDataFetch(Message):
    pair: str
    data: List[OHLCVData]
    request: RequestDataFetch

I don't get any error or warning except those address info warnings

2021-09-15 15:55:26,842 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]
2021-09-15 15:55:31,848 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]
2021-09-15 15:55:36,851 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]
2021-09-15 15:55:41,857 WARNING =>  Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known  [IPBase.py:16]
initializing datafetcher
initializing datafetcher

My only suspicion could be the serialization of the db handler or the ccxt instance that could fail, but I don't have any warning about it ... in case pickly can't pickled the object, would it send me a warning or error message?

carlos-lm avatar Sep 15 '21 20:09 carlos-lm

It's hard to say what your issue might be from the code fragments above, but I can make some observations about things that you should probably change:

The actor's receiveMessage handler should not block; the handler is called to handle a message received by an outer wrapper that is essentially (pseudocode):

while True:
    msg = process_asynchronous_sends_and_receives()
    self.receiveMessage(msg, sender)

If the self.receiveMessage blocks then no other activity can occur. This includes logging, since log messages are forwarded with the equivalent of a self.send() operation that is actually sent by the process_asynchronous_sends_and_receives portion above.

You have time.sleep in your receiveHandler which will block in this manner; I suggest using Wakeup Messages instead (https://thespianpy.com/doc/using.html#hH-9cc1acd4-7f35-44ac-94fc-0326dc87a7a5).

The self.exchange.fetch_ohlcv calls are probably blocking as well. You may not be able to do anything about them, but if it's possible to get the file handle they are working on, I'd recommend using Watched File Descriptors (https://thespianpy.com/doc/using.html#hH-94edef18-154e-4847-864f-027dff4f6e0a).

An actor should never call ActorSystem calls; those should only be made from code outside of an Actor. Change fetcher = ActorSystem(actor_mode, logDefs=logcfg).createActor(DataFetcher) to fetcher = self.createActor(DataFetcher)

The ActorSystem should be started already (probably in your __main__ routine). That's the place where actor_mode and set the logDefs should be applied; once the ActorSystem is started, the logging and mode cannot be changed.

In DataFetcher, the __init__ routine will not accept or pass along any arguments. You should use the following idiom for accomodating any existing __init__ arguments:

def __init__(self, *args, **kw):
    super().__init__(*args, **kw)

kquick avatar Sep 16 '21 07:09 kquick

@kquick thank you so much for your answer and your patience :sweat_smile: This has made it a bit difficult for me

I've some doubts with your last reply so I did this basic example

import logging
from dataclasses import dataclass
from typing import List, Union

from thespian.actors import *


@dataclass
class Message:
    pass


@dataclass
class Plus(Message):
    a: int
    b: int


@dataclass
class Substract(Message):
    a: int
    b: int


@dataclass
class Result(Message):
    response: int


@dataclass
class ListRequest(Message):
    op: List[Union[Plus, Substract]]


class ActorTwo(ActorTypeDispatcher):
    def __init__(self, *args, **kw):
        super().__init__(*args, **kw)

    def receiveMessage(self, msg: Message, sender):
        logging.info(f"ACTORTWO: received {msg}")
        if isinstance(msg, Plus):
            self.send(sender, Result(response=msg.a + msg.b))

        elif isinstance(msg, Substract):
            self.send(sender, Result(response=msg.a - msg.b))


class ActorOne(ActorTypeDispatcher):
    results = []
    req_list = []
    requester = None  # this is the actor who send the original request

    def __init__(self, *args, **kw):
        super().__init__(*args, **kw)

    def receiveMessage(self, msg: Union[ListRequest, Result], sender):
        logging.info(f"received message {msg}")
        if isinstance(msg, ListRequest):
            self.req_list = msg.op
            self.requester = sender
            for op in msg.op:
                actor = self.createActor(ActorTwo)
                self.send(actor, op)

        elif isinstance(msg, Result):
            self.results.append(msg.response)

            if len(self.results) == len(self.req_list):
                logging.info(f"ready to answer {self.results}")
                self.send(self.requester, self.results)

        else:
            logging.error(f"UNKNOWN MESSAGE {msg}")


class actorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' in logrecord.__dict__


class notActorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' not in logrecord.__dict__


logcfg = {'version': 1,
          'formatters': {
              'normal': {'format': '%(levelname)-8s %(message)s'},
              'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
          'filters': {'isActorLog': {'()': actorLogFilter},
                      'notActorLog': {'()': notActorLogFilter}},
          'handlers': {'h1': {'class': 'logging.FileHandler',
                              'filename': 'example.log',
                              'formatter': 'normal',
                              'filters': ['notActorLog'],
                              'level': logging.INFO},
                       'h2': {'class': 'logging.FileHandler',
                              'filename': 'example.log',
                              'formatter': 'actor',
                              'filters': ['isActorLog'],
                              'level': logging.INFO}, },
          'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
          }

# main
if __name__ == '__main__':
    debug = True
    actor_mode = "simpleSystemBase" if debug else "multiprocTCPBase"
    actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
    print(ActorSystem().ask(actor_one, ListRequest(op=[Plus(a=10, b=12), Substract(a=5, b=2)])))

here I could replicate the two problems

  1. this work for simpleSystemBase (debug=True) but not for multiprocTCPBase...I get this error
/Users/Admin/Downloads/proj/py/pruebaimport/venv/bin/python /Users/Admin/Downloads/proj/py/pruebaimport/main.py
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
Traceback (most recent call last):
  File "/Users/Admin/Downloads/proj/py/pruebaimport/main.py", line 109, in <module>
    actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 704, in createActor
    return self._systemBase.newPrimaryActor(actorClass,
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/systemBase.py", line 195, in newPrimaryActor
    raise InvalidActorSpecification(actorClass,
thespian.actors.InvalidActorSpecification: Invalid Actor Specification: <class '__main__.ActorOne'> (module '__main__' has no attribute 'ActorOne')

Process finished with exit code 1
  1. logging doesn't work for me, I can't get any log, maybe I'm doing something dumb here, I don't have so much experience with python :grimacing:

thank so much, I appreciate your patience with this, hope this last post can solve my doubts.

carlos-lm avatar Sep 20 '21 15:09 carlos-lm

When you startup an ActorSystem1 with one of the multiproc bases, that ActorSystem will fork and run as a separate process. That process will continue to run until it is killed, either via an OS level operation (e.g. kill) or by sending the ActorSystem a shutdown() request.

When the ActorSystem is forked, the memory of the process starting the ActorSystem is cloned, including the running code. Any subsequent modification to your code locally will not be seen by the still-running ActorSystem because it is still running the older version of code in its memory.

I believe this is the problem you are encountering. If you ensure that the previous ActorSystem is shutdown and then run your program above, I believe you will get better results (for both the "ActorOne" unknown issue and the logging). Also note that the Thespian Loadable Sources is the recommended method to get around this code update issue, although that adds some additional complexity beyond the simple tests you are running at this point.

Please let me know if you still have difficulty after trying the above and ensuring the previous ActorSystem is shutdown.

kquick avatar Sep 23 '21 04:09 kquick

hi @kquick sorry for the late response, I don't know if this is the right way but I added a ActorSystem("multiprocTCPBase").shutdown() in my script start (I can't include this at the end because these actors must run forever...), I read this in the doc

This however will stop all other Actors that are currently running in that system base. The alternative is to
dynamically load the new source code as described in Dynamic Source Loading.```

however that is not a problem to me because we want to run this actor system in a docker container isolated of others actors

I run this and seems work ok

```py
if __name__ == '__main__':
    debug = False
    ActorSystem("multiprocTCPBase").shutdown()
    actor_mode = "simpleSystemBase" if debug else "multiprocTCPBase"
    actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
    print(ActorSystem().ask(actor_one, ListRequest(op=[Plus(a=10, b=12), Substract(a=5, b=2)])))
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
[22, 3]

but I'm afraid I'm not getting any log yet :grimacing:

update: I'm getting this error, if I kill the app in the port 1900 this works again, for me it's ok because I can do it programmatically but is this way ok?

WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
Traceback (most recent call last):
  File "/Users/Admin/Downloads/proj/py/pruebaimport/main.py", line 101, in <module>
    ActorSystem("multiprocTCPBase").shutdown()
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 637, in __init__
    systemBase = self._startupActorSys(
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 678, in _startupActorSys
    systemBase = sbc(self, logDefs=logDefs)
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/multiprocTCPBase.py", line 28, in __init__
    super(ActorSystemBase, self).__init__(system, logDefs)
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/multiprocCommon.py", line 82, in __init__
    super(multiprocessCommon, self).__init__(system, logDefs)
  File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/systemBase.py", line 335, in __init__
    raise InvalidActorAddress(self.adminAddr,
thespian.actors.InvalidActorAddress: ActorAddr-(T|:1900) is not a valid or useable ActorSystem Admin

Process finished with exit code 1

thank you so much

carlos-lm avatar Sep 29 '21 00:09 carlos-lm

Killing the other app on port 1900 should open it up to use by Thespian, but that assumes the other app is not needed (and if so, it might be better to remove it from whatever startup script is running). Alternatively you can specify an alternate port for Thespian to use by setting the Admin Port capability to a different value when you start the ActorSystem (https://thespianpy.com/doc/using.html#hH-9d33a877-b4f0-4012-9510-442d81b0837c).

kquick avatar Oct 06 '21 20:10 kquick