Thespian
Thespian copied to clipboard
questions: how can I avoid main thread exits when run actors with multiprocQueueBase
Hello, thanks for this lib, it's great, right now I've a small system with some actors, everything works as expected when I use simpleSystemBase but now I want to run this really async...so I'd like to use "multiprocQueueBase"
basically I've a normal python script which start some central actor (Dispatcher)
self.actor_dispatcher = ActorSystem().createActor(Dispatcher)
this central actor creates new actors based in the number of items passed in some message
for ticker in message.tickers:
actor_mode = "simpleSystemBase" if self.test else "multiprocQueueBase"
fetcher = ActorSystem(actor_mode).createActor(DataFetcher)
these actors (DataFetcher) must be async because they do request to some API
is this arquitecture correct?...dispatcher must use "multiprocQueueBase or is ok start this using the default simpleSystemBase"?
how can I avoid that my main process (the script that create the dispatcher actor) end before my actors (actually this actor system must be running forever) ..use a while True in the main script is ok or are there a better approach?
also, the logs in the actors will be displayed in the running script?..right now (using a while True in the main script) I can't get logs
thank you so much!!!
Hi @carlos-lm ,
Thanks for the appreciation: it's nice to know this library has been useful.
You will almost certainly want to use one of the multi-process bases; I would actually recommend the multiprocTCPBase
(https://thespianpy.com/doc/using.html#hH-59ea5ca7-a167-4d51-bd1b-78119eab6df6) if you run into issues using the multiprocQueueBase
. Each of the multiproc bases does start up a separate process to act as the admin, independent of the application that started them; the started actor system will persist until an ActorSystem(actor_mode).shutdown
stops them. Note that the multiprocQueueBase
does not support re-connection, so it cannot be shutdown in this manner due to limitations in Python Queue functionality (https://thespianpy.com/doc/using.html#hH-59ea5ca7-a167-4d51-bd1b-78119eab6df6); it can only be shutdown by the same process that started it, or by external process kills.
Because a multiproc system has multiple processes, logging is coordinated centrally and written to files as specified by the logDefs
argument used when starting the ActorSystem()
(see https://thespianpy.com/doc/using.html#hH-f71b7bfa-c57b-4716-a2c7-ad83a2ed3582 and https://thespianpy.com/doc/using.html#hH-ce55494c-dd7a-4258-a1e8-b090c3bbb1e6).
Hopefully this will help for your needs.
thank you for the detailed answer @kquick
I was checking the hellogoodbye sample and seems that change between system is just pass "multiprocTCPBase" as argument to ActorSystem...
I did that
actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
fetcher = ActorSystem(actor_mode,logDefs=logcfg).createActor(DataFetcher)
but I'm getting this error
2021-09-14 20:22:06,672 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
2021-09-14 20:22:11,675 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
another question...I'm passing a complex object in my message...I've my doubts with this but for simpleSystemBase
this is working perfectly...it could happen that for simple serialization works fine for simpleSystemBase
but not in other modes?...
thank you
That warning is described here: https://github.com/kquick/Thespian/issues/11
Thespian uses the Python pickle
facility to pass objects. As long as your message can be pickled, Thespian can pass it, although your intuition is correct: the simpleSystemBase
doesn't need to pass messages so it can get away with more permissive message types than any other base.
@kquick right now my code works ok when I run with simpleSystemBase, but when I move to another system seems that the actors not work but I don't get any error or message...
this is my code, this actor create another actors and send the message
import logging
from bqbbdb.basedb import BaseDB
from thespian.actors import Actor, ActorSystem
from typing import List
from bqbbexchanges.exchange.actors.data_fetcher import DataFetcher
from bqbbexchanges.exchange.actors.message import Message, SetPropertiesMessage, GetHistoricalData, \
ResponseDataFetch, RequestDataFetch
from bqbbexchanges.settings import Settings, settings
from bqbbexchanges.utils.has_key import has_key
from bqbbexchanges.utils.my_logging import logcfg
from bqbbexchanges.utils.types import OHLCVData
class Dispatcher(Actor):
"""Dispatcher
this actor create new actors according to the tickers that needs to fetch, when the data_fetcher
return a response, this handle the response, that is, save in the db, send new data to the data_fetcher,etc
"""
db: BaseDB = None
exchange = None
# some code here....
def receiveMessage(self, message: Message, sender):
if isinstance(message, SetPropertiesMessage):
logging.info("initializing dispatcher with required db and exchange")
self.exchange = message.exchange
self.timeframes_mapper = message.timeframes_mapper
self.test = message.test
# here I pass the exchange and DB handler to the actor in order that this can make request to get data and store it
self.db = message.DB
self.exchange_prefix = message.exchange_prefix
# self.send(sender, f"Hello, World! {self.name}")
elif isinstance(message, GetHistoricalData):
logging.debug("start getting historical data")
# THIS IS THE IMPORTANT CODE ==================================== <----------
for ticker in message.tickers:
actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
fetcher = ActorSystem(actor_mode, logDefs=logcfg).createActor(DataFetcher)
init_message = SetPropertiesMessage()
init_message.exchange = self.exchange
init_message.timeframes_mapper = self.timeframes_mapper
self.send(fetcher, init_message)
self.send(fetcher,
RequestDataFetch(ticker_and_date=ticker, wait_time=0))
elif isinstance(message, ResponseDataFetch):
self._handle_received_data(message, sender)
else:
logging.error("Dispatcher> UNKNOWN MESSAGE, returning properties")
message = SetPropertiesMessage()
message.DB = self.db
message.exchange = self.exchange
message.timeframes_mapper = self.timeframes_mapper
message.stored_data = self.stored_data
self.send(sender, message)
# dispatcher = ActorSystem().createActor(Dispatcher)
# print(ActorSystem().ask(dispatcher, 'hi', 1))
and this are the fetcher actors
import logging
import time
from thespian.actors import Actor
from typing import List
from bqbbexchanges.exchange.actors.message import Message, RequestDataFetch, SetPropertiesMessage, ResponseDataFetch
from bqbbexchanges.settings import settings
from bqbbexchanges.utils.types import OHLCVData
class DataFetcher(Actor):
"""
This actor only handle data fetcher for historical data,
if there is a problem retrieving the data returns to the caller an empty list
"""
exchange = None
timeframes_mapper = {"1M": "1M", "1h": "1h",
"1m": "1m"} # override this method allows you use another symbols for timeframe
def __init__(self):
super().__init__()
print("initializing datafetcher")
def _format_ohlcv_data(self, tick: list) -> OHLCVData:
return {
"time": str(tick[0]),
"open": tick[1],
"high": tick[2],
"low": tick[3],
"close": tick[4],
"volume": tick[5]
}
def _fetch_ohlcv(self, pair, unix_date: int) -> List[OHLCVData]:
try:
if self.exchange is None or self.timeframes_mapper is None:
logging.error("not initialized properties, please initialize first")
if not self.exchange.has["fetchOHLCV"]:
error = "This exchange doesn't have fetchOHLCV method implemented"
logging.error(error)
raise NotImplementedError(error)
logging.debug(f"fetching data {pair} {unix_date}")
data = self.exchange.fetch_ohlcv(pair,
self.timeframes_mapper["1m"],
since=unix_date,
limit=settings.ACTOR_MAX_CANDLES_TO_RETRIEVE_BY_CALL)
formated_data = []
if len(data) == 0 or data is None:
return formated_data
for tick in data:
if len(tick) == 6:
ohlcv_tick = self._format_ohlcv_data(tick)
formated_data.append(ohlcv_tick)
return formated_data
except Exception as err:
logging.error(f"ERROR IN ACTOR DataFetcher> {err}")
return []
def receiveMessage(self, message: Message, sender):
logging.debug(f"DataFetcher> Receive {message}")
if isinstance(message, SetPropertiesMessage):
logging.info("initializing exchange")
self.exchange = message.exchange
if isinstance(message, RequestDataFetch):
logging.debug(f"fetcher: received to fetch: {message.ticker_and_date}")
pair = message.ticker_and_date["pair"]
time.sleep(message.wait_time or 0)
formated_data = self._fetch_ohlcv(pair=pair,
unix_date=message.ticker_and_date["init_date"])
self.send(sender, ResponseDataFetch(pair=pair, data=formated_data, request=message))
from dataclasses import dataclass
from typing import List
from bqbbexchanges.utils.types import TickerAndDate, OHLCVData
@dataclass
class Message:
pass
@dataclass
class SetPropertiesMessage(Message):
DB = None
exchange = None
timeframes_mapper = None
test = False
stored_data = None
exchange_prefix = None
@dataclass
class GetHistoricalData(Message):
tickers: List[TickerAndDate]
@dataclass
class RequestDataFetch(Message):
ticker_and_date: TickerAndDate
wait_time: int
@dataclass
class ResponseDataFetch(Message):
pair: str
data: List[OHLCVData]
request: RequestDataFetch
I don't get any error or warning except those address info warnings
2021-09-15 15:55:26,842 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
2021-09-15 15:55:31,848 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
2021-09-15 15:55:36,851 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
2021-09-15 15:55:41,857 WARNING => Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known [IPBase.py:16]
initializing datafetcher
initializing datafetcher
My only suspicion could be the serialization of the db handler or the ccxt instance that could fail, but I don't have any warning about it ... in case pickly can't pickled the object, would it send me a warning or error message?
It's hard to say what your issue might be from the code fragments above, but I can make some observations about things that you should probably change:
The actor's receiveMessage
handler should not block; the handler is called to handle a message received by an outer wrapper that is essentially (pseudocode):
while True:
msg = process_asynchronous_sends_and_receives()
self.receiveMessage(msg, sender)
If the self.receiveMessage
blocks then no other activity can occur. This includes logging, since log messages are forwarded with the equivalent of a self.send()
operation that is actually sent by the process_asynchronous_sends_and_receives
portion above.
You have time.sleep
in your receiveHandler
which will block in this manner; I suggest using Wakeup Messages instead (https://thespianpy.com/doc/using.html#hH-9cc1acd4-7f35-44ac-94fc-0326dc87a7a5).
The self.exchange.fetch_ohlcv
calls are probably blocking as well. You may not be able to do anything about them, but if it's possible to get the file handle they are working on, I'd recommend using Watched File Descriptors (https://thespianpy.com/doc/using.html#hH-94edef18-154e-4847-864f-027dff4f6e0a).
An actor should never call ActorSystem
calls; those should only be made from code outside of an Actor. Change fetcher = ActorSystem(actor_mode, logDefs=logcfg).createActor(DataFetcher)
to fetcher = self.createActor(DataFetcher)
The ActorSystem should be started already (probably in your __main__
routine). That's the place where actor_mode
and set the logDefs
should be applied; once the ActorSystem is started, the logging and mode cannot be changed.
In DataFetcher
, the __init__
routine will not accept or pass along any arguments. You should use the following idiom for accomodating any existing __init__
arguments:
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
@kquick thank you so much for your answer and your patience :sweat_smile: This has made it a bit difficult for me
I've some doubts with your last reply so I did this basic example
import logging
from dataclasses import dataclass
from typing import List, Union
from thespian.actors import *
@dataclass
class Message:
pass
@dataclass
class Plus(Message):
a: int
b: int
@dataclass
class Substract(Message):
a: int
b: int
@dataclass
class Result(Message):
response: int
@dataclass
class ListRequest(Message):
op: List[Union[Plus, Substract]]
class ActorTwo(ActorTypeDispatcher):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
def receiveMessage(self, msg: Message, sender):
logging.info(f"ACTORTWO: received {msg}")
if isinstance(msg, Plus):
self.send(sender, Result(response=msg.a + msg.b))
elif isinstance(msg, Substract):
self.send(sender, Result(response=msg.a - msg.b))
class ActorOne(ActorTypeDispatcher):
results = []
req_list = []
requester = None # this is the actor who send the original request
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
def receiveMessage(self, msg: Union[ListRequest, Result], sender):
logging.info(f"received message {msg}")
if isinstance(msg, ListRequest):
self.req_list = msg.op
self.requester = sender
for op in msg.op:
actor = self.createActor(ActorTwo)
self.send(actor, op)
elif isinstance(msg, Result):
self.results.append(msg.response)
if len(self.results) == len(self.req_list):
logging.info(f"ready to answer {self.results}")
self.send(self.requester, self.results)
else:
logging.error(f"UNKNOWN MESSAGE {msg}")
class actorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' in logrecord.__dict__
class notActorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' not in logrecord.__dict__
logcfg = {'version': 1,
'formatters': {
'normal': {'format': '%(levelname)-8s %(message)s'},
'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
'filters': {'isActorLog': {'()': actorLogFilter},
'notActorLog': {'()': notActorLogFilter}},
'handlers': {'h1': {'class': 'logging.FileHandler',
'filename': 'example.log',
'formatter': 'normal',
'filters': ['notActorLog'],
'level': logging.INFO},
'h2': {'class': 'logging.FileHandler',
'filename': 'example.log',
'formatter': 'actor',
'filters': ['isActorLog'],
'level': logging.INFO}, },
'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
}
# main
if __name__ == '__main__':
debug = True
actor_mode = "simpleSystemBase" if debug else "multiprocTCPBase"
actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
print(ActorSystem().ask(actor_one, ListRequest(op=[Plus(a=10, b=12), Substract(a=5, b=2)])))
here I could replicate the two problems
- this work for simpleSystemBase (debug=True) but not for multiprocTCPBase...I get this error
/Users/Admin/Downloads/proj/py/pruebaimport/venv/bin/python /Users/Admin/Downloads/proj/py/pruebaimport/main.py
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
Traceback (most recent call last):
File "/Users/Admin/Downloads/proj/py/pruebaimport/main.py", line 109, in <module>
actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 704, in createActor
return self._systemBase.newPrimaryActor(actorClass,
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/systemBase.py", line 195, in newPrimaryActor
raise InvalidActorSpecification(actorClass,
thespian.actors.InvalidActorSpecification: Invalid Actor Specification: <class '__main__.ActorOne'> (module '__main__' has no attribute 'ActorOne')
Process finished with exit code 1
- logging doesn't work for me, I can't get any log, maybe I'm doing something dumb here, I don't have so much experience with python :grimacing:
thank so much, I appreciate your patience with this, hope this last post can solve my doubts.
When you startup an ActorSystem
1 with one of the multiproc bases, that ActorSystem will fork and run as a separate process. That process will continue to run until it is killed, either via an OS level operation (e.g. kill
) or by sending the ActorSystem
a shutdown()
request.
When the ActorSystem
is forked, the memory of the process starting the ActorSystem
is cloned, including the running code. Any subsequent modification to your code locally will not be seen by the still-running ActorSystem
because it is still running the older version of code in its memory.
I believe this is the problem you are encountering. If you ensure that the previous ActorSystem
is shutdown and then run your program above, I believe you will get better results (for both the "ActorOne" unknown issue and the logging). Also note that the Thespian Loadable Sources is the recommended method to get around this code update issue, although that adds some additional complexity beyond the simple tests you are running at this point.
Please let me know if you still have difficulty after trying the above and ensuring the previous ActorSystem is shutdown.
hi @kquick sorry for the late response, I don't know if this is the right way but I added a ActorSystem("multiprocTCPBase").shutdown()
in my script start (I can't include this at the end because these actors must run forever...), I read this in the doc
This however will stop all other Actors that are currently running in that system base. The alternative is to
dynamically load the new source code as described in Dynamic Source Loading.```
however that is not a problem to me because we want to run this actor system in a docker container isolated of others actors
I run this and seems work ok
```py
if __name__ == '__main__':
debug = False
ActorSystem("multiprocTCPBase").shutdown()
actor_mode = "simpleSystemBase" if debug else "multiprocTCPBase"
actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
print(ActorSystem().ask(actor_one, ListRequest(op=[Plus(a=10, b=12), Substract(a=5, b=2)])))
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
[22, 3]
but I'm afraid I'm not getting any log yet :grimacing:
update: I'm getting this error, if I kill the app in the port 1900 this works again, for me it's ok because I can do it programmatically but is this way ok?
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
WARNING:root:Unable to get address info for address Administrators-MacBook-Pro.local (AddressFamily.AF_INET, SocketKind.SOCK_DGRAM, 17, AddressInfo.AI_PASSIVE): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known
Traceback (most recent call last):
File "/Users/Admin/Downloads/proj/py/pruebaimport/main.py", line 101, in <module>
ActorSystem("multiprocTCPBase").shutdown()
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 637, in __init__
systemBase = self._startupActorSys(
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/actors.py", line 678, in _startupActorSys
systemBase = sbc(self, logDefs=logDefs)
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/multiprocTCPBase.py", line 28, in __init__
super(ActorSystemBase, self).__init__(system, logDefs)
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/multiprocCommon.py", line 82, in __init__
super(multiprocessCommon, self).__init__(system, logDefs)
File "/Users/Admin/Downloads/proj/py/pruebaimport/venv/lib/python3.8/site-packages/thespian/system/systemBase.py", line 335, in __init__
raise InvalidActorAddress(self.adminAddr,
thespian.actors.InvalidActorAddress: ActorAddr-(T|:1900) is not a valid or useable ActorSystem Admin
Process finished with exit code 1
thank you so much
Killing the other app on port 1900 should open it up to use by Thespian, but that assumes the other app is not needed (and if so, it might be better to remove it from whatever startup script is running). Alternatively you can specify an alternate port for Thespian to use by setting the Admin Port
capability to a different value when you start the ActorSystem (https://thespianpy.com/doc/using.html#hH-9d33a877-b4f0-4012-9510-442d81b0837c).