watchtower
watchtower copied to clipboard
Watchtower + Multiprocessing
The logger is not successfully writing to CloudWatch when using multiprocessing. I tested to see whether this was my configuration by dropping a watchtower handler and using a file handler. This logged perfectly, however, when switching back to the watchtower handler only messages before and after outputs = pool.map(worker, inputs)
worked.
Any idea how to fix this? Setting use_queues
to True
didn't help.
Sample code:
import watchtower
import logging
from multiprocessing import Lock, Process, Queue, current_process, Manager, Pool
def worker(var):
logger.debug("Incoming variable: %s" % var)
logger.debug("Outgoing variable: %s" % (var+1))
return var+1
def main():
inputs = []
for i in xrange(1000):
inputs.append(i)
logger.debug("Starting run now!")
pool = Pool(processes=3)
outputs = pool.map(worker, inputs)
pool.close()
pool.join()
logger.debug("Just finished run")
if __name__ == "__main__":
logger = logging.getLogger("multi")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("test.log")
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
wt_project_handler = watchtower.CloudWatchLogHandler(stream_name="test",
use_queues=True)
wt_project_handler.setLevel(logging.DEBUG)
logger.addHandler(wt_project_handler)
main()
This now works when I set use_queues
to False
. However, I now get the following two errors:
TypeError: ('__init__() takes exactly 3 arguments (2 given)',
<class 'botocore.exceptions.ClientError'>, (u'An error occurred (InvalidSequenceTokenException)
when calling the PutLogEvents operation: The given sequenceToken is invalid. The next expected
sequenceToken is: 49566986361376648647772148512500932485923247621329154002',))
and
An error occurred (ThrottlingException) when calling the PutLogEvents operation (reached max
retries: 4): Rate exceeded
Solved this issue by using this repo: https://github.com/jruere/multiprocessing-logging, which was spun out of this post: http://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python.
All it resulted in was importing multiprocessing_logging and then adding multiprocessing_logging.install_mp_handler(logger)
Thanks, I probably need to add docs on how to deal with this, so reopening this issue to keep track of that.
Just experienced this issue, thanks for the fix @Audace!
@kislyuk it may be worth updating the docs to include a reference to that library. I'm working with django rq and any logging within a worker process was not making it into the watchtower batch. I'd assume other worker libraries like Celery would also experience this issue. Similar to @Audace 's answer, I updated my django app's ready() function (guaranteeing the install_mp_handler() is called after logging is setup and before any logs are sent)
from django.apps import AppConfig
class MyAppConfig(AppConfig):
name = 'MyApp'
def ready(self):
import multiprocessing_logging
import logging
multiprocessing_logging.install_mp_handler(logging.getLogger("my_apps_primary_logger"))
Just wanted to add that although this seemed solved, I still encounter situations where all logging stops reaching CloudWatch suddenly, but continues to successfully log to local log files. I do not see any obvious causes, and after restarting my workers everything is back to normal as if nothing was wrong. If anyone has any thoughts, please feel free to share, and I will update if I come across a solution. @Audace not sure if you saw anything similar after implementing your solution?
@kislyuk Is this the suggested way to deal with multi-process logging for this library?
The suggested way to use logging in multiprocessing pools is to share nothing. Use one logger per worker process (or thread) and initialize the logger after forking. A shared logger will not work correctly with multiprocessing due to the stateful nature of the logger and race conditions that will arise between different copies of the logger in the different processes.
@kislyuk So, I'm not using a multiprocessing pool, just manually creating all the child processes I want.
If I pass a separate logger instance to each child process, does each logger instance log to a separate file / stream? Is there a way to collate everything based on based on some metric(like timestamp)?
@redixhumayun that is for you to configure. Check the project documentation for how to configure the log stream names. Check the cloudwatch documentation for how to collate logs.
I wanted to post my solution here, just in case it is useful to anyone else.
The Python Logging Cookbook has some great examples of how to log to a single file form multiple processes, a sample of that can be found here.
The sample linked above makes use of the QueueHandler
class that accepts logs from multiple processes and outputs them somewhere else. Since it is a queue, log order is maintained.
Integrating with WatchTower is as easy as adding an additional handler at the output end of the queue.
You can use the sample code from the link above and just modify the listener_configurer
function as below
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
watchtower_handler = watchtower.CloudWatchLogHandler()
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
watchtower_handler.setFormatter(f)
root.addHandler(h)
root.addHandler(watchtower_handler)