watchtower icon indicating copy to clipboard operation
watchtower copied to clipboard

Watchtower + Multiprocessing

Open Audace opened this issue 7 years ago • 10 comments

The logger is not successfully writing to CloudWatch when using multiprocessing. I tested to see whether this was my configuration by dropping a watchtower handler and using a file handler. This logged perfectly, however, when switching back to the watchtower handler only messages before and after outputs = pool.map(worker, inputs) worked.

Any idea how to fix this? Setting use_queues to True didn't help.

Sample code:

import watchtower
import logging
from multiprocessing import Lock, Process, Queue, current_process, Manager, Pool

def worker(var):
    logger.debug("Incoming variable: %s" % var)
    logger.debug("Outgoing variable: %s" % (var+1))
    return var+1

def main():
    inputs = []
    for i in xrange(1000):
        inputs.append(i)

    logger.debug("Starting run now!")
    pool = Pool(processes=3)
    outputs = pool.map(worker, inputs)
    pool.close()
    pool.join()
    logger.debug("Just finished run")

if __name__ == "__main__":
    logger = logging.getLogger("multi")
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("test.log")
    fh.setLevel(logging.DEBUG)
    logger.addHandler(fh)

    wt_project_handler = watchtower.CloudWatchLogHandler(stream_name="test",
                                                         use_queues=True)
    wt_project_handler.setLevel(logging.DEBUG)
    logger.addHandler(wt_project_handler)

    main()

Audace avatar Nov 24 '16 02:11 Audace

This now works when I set use_queues to False. However, I now get the following two errors:

TypeError: ('__init__() takes exactly 3 arguments (2 given)',
<class 'botocore.exceptions.ClientError'>, (u'An error occurred (InvalidSequenceTokenException)
when calling the PutLogEvents operation: The given sequenceToken is invalid. The next expected
sequenceToken is: 49566986361376648647772148512500932485923247621329154002',))

and

An error occurred (ThrottlingException) when calling the PutLogEvents operation (reached max
retries: 4): Rate exceeded

Audace avatar Nov 24 '16 03:11 Audace

Solved this issue by using this repo: https://github.com/jruere/multiprocessing-logging, which was spun out of this post: http://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python.

All it resulted in was importing multiprocessing_logging and then adding multiprocessing_logging.install_mp_handler(logger)

Audace avatar Nov 25 '16 15:11 Audace

Thanks, I probably need to add docs on how to deal with this, so reopening this issue to keep track of that.

kislyuk avatar Mar 26 '17 19:03 kislyuk

Just experienced this issue, thanks for the fix @Audace!

@kislyuk it may be worth updating the docs to include a reference to that library. I'm working with django rq and any logging within a worker process was not making it into the watchtower batch. I'd assume other worker libraries like Celery would also experience this issue. Similar to @Audace 's answer, I updated my django app's ready() function (guaranteeing the install_mp_handler() is called after logging is setup and before any logs are sent)

from django.apps import AppConfig

class MyAppConfig(AppConfig):
    name = 'MyApp'

def ready(self):
    import multiprocessing_logging
    import logging
    multiprocessing_logging.install_mp_handler(logging.getLogger("my_apps_primary_logger"))

spetoolio avatar Nov 04 '18 17:11 spetoolio

Just wanted to add that although this seemed solved, I still encounter situations where all logging stops reaching CloudWatch suddenly, but continues to successfully log to local log files. I do not see any obvious causes, and after restarting my workers everything is back to normal as if nothing was wrong. If anyone has any thoughts, please feel free to share, and I will update if I come across a solution. @Audace not sure if you saw anything similar after implementing your solution?

spetoolio avatar Dec 03 '18 20:12 spetoolio

@kislyuk Is this the suggested way to deal with multi-process logging for this library?

redixhumayun avatar Dec 29 '20 17:12 redixhumayun

The suggested way to use logging in multiprocessing pools is to share nothing. Use one logger per worker process (or thread) and initialize the logger after forking. A shared logger will not work correctly with multiprocessing due to the stateful nature of the logger and race conditions that will arise between different copies of the logger in the different processes.

kislyuk avatar Jan 01 '21 17:01 kislyuk

@kislyuk So, I'm not using a multiprocessing pool, just manually creating all the child processes I want.

If I pass a separate logger instance to each child process, does each logger instance log to a separate file / stream? Is there a way to collate everything based on based on some metric(like timestamp)?

redixhumayun avatar Jan 02 '21 07:01 redixhumayun

@redixhumayun that is for you to configure. Check the project documentation for how to configure the log stream names. Check the cloudwatch documentation for how to collate logs.

kislyuk avatar Jan 02 '21 15:01 kislyuk

I wanted to post my solution here, just in case it is useful to anyone else.

The Python Logging Cookbook has some great examples of how to log to a single file form multiple processes, a sample of that can be found here.

The sample linked above makes use of the QueueHandler class that accepts logs from multiple processes and outputs them somewhere else. Since it is a queue, log order is maintained.

Integrating with WatchTower is as easy as adding an additional handler at the output end of the queue.

You can use the sample code from the link above and just modify the listener_configurer function as below

def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    watchtower_handler = watchtower.CloudWatchLogHandler()
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    watchtower_handler.setFormatter(f)
    root.addHandler(h)
    root.addHandler(watchtower_handler)

redixhumayun avatar Feb 09 '21 12:02 redixhumayun