splunk_handler icon indicating copy to clipboard operation
splunk_handler copied to clipboard

splunk_handler doesn't transmit messages from multiprocessing.pool processes

Open GraemeMeyerGT opened this issue 1 year ago • 0 comments

This one is curious to me, but splunk_handler seems to fail to transmit log events which are created by multiprocessing.pool processes. I have constructed a small application which uses multiprocessing.pool processes to demonstrate the issue.

The app starts a processing pool with 1/2/n processes, and submits the worker_function function to the pool, with a simple array of integers. Each process then adds 10 to the input number and returns the result.

All log messages are created and written successfully to STDOUT and to the log file, but Splunk only receives the ones which are created by the main process (or at least doesn't receive any which are created by the worker_function). This holds true for larger and smaller numbers of processes (including 1), and also for the much larger application I'm actually trying to build. No events from the pool processes are ever transmitted, and no events from the main process ever seem to be lost.

During debugging I can see that the log events from the pool processes are being added to the self.queue but at some point, for a reason I have not yet been able to determine, they are removed or disappear and just the events from the main process remain, then the application continues like normal.

This may be related to PR #25

# main.py
from multiprocessing import Pool
from modules.constants import *
from modules.logging import *

def worker_function(input_number):
    logger.debug(f"Entering function: worker_function() as {multiprocessing.current_process().name}")

    logger.info(f"Number before: {input_number}")
    output_number = input_number + 10
    logger.info(f"Number after: {output_number}")

    return output_number
    
def run_in_parallel(list_of_chunked_data, worker, number_of_processes=3):
    logger.debug("Entering function: run_in_parallel()")
    logger.info(f"Starting parallel processing with {number_of_processes} processes.")
    
    pool_results = []
    with Pool(number_of_processes) as pool:
        worker_results = pool.map(worker, list_of_chunked_data)
        for result in worker_results:
            pool_results.append(result)
    return pool_results

def main():
    logger.info(f"Starting application.")

    list_of_numbers=[1,2,3,4]

    results = run_in_parallel(list_of_numbers, worker_function, 3)
    
    logger.info(f"Results: {results}")

    logger.info(f"Ending application.")


if __name__ == '__main__':
    main()
# logging.py
import logging
import multiprocessing
import sys

from modules.constants import *
from splunk_handler import SplunkHandler

###
# Variables and configurations
###
DEFAULT_LOGGING_FORMAT = '[%(asctime)s] [%(levelname)s] [%(name)s] [%(funcName)s]: %(message)s'
DEFAULT_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
DEFAULT_LOG_FILE = "/var/log/my_application/my_application.log"

default_formatter = logging.Formatter(
    fmt = DEFAULT_LOGGING_FORMAT,
    datefmt = DEFAULT_DATE_FORMAT
)

###
# Logger(s)
###

# Custom logger for all application logs
logger = logging.getLogger("my_application")
logger.setLevel(logging.DEBUG)

###
# Log Handler(s)
### 

# STDOUT: Outputs application trace logs to stdout
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(default_formatter)

# File: Outputs application trace logs to the log file
file_handler = logging.FileHandler(
    filename = DEFAULT_LOG_FILE,
    mode = "a"
)
file_handler.setFormatter(default_formatter)

# Splunk: Outputs application trace logs to Splunk
splunk_trace_handler = SplunkHandler(
    host = SPLUNK_HTTP_INPUT_FQDN,
    port = 443,
    token = SPLUNK_HEC_TOKEN,
    index = "my_application",
    sourcetype = "trace",
    debug = True
)
splunk_trace_handler.setFormatter(default_formatter)

logger.addHandler(splunk_trace_handler)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)

GraemeMeyerGT avatar Aug 04 '23 15:08 GraemeMeyerGT