taskiq
taskiq copied to clipboard
Allow metrics registry to be passed through PrometheusMiddleware
Registers the prometheus metrics with a CollectorRegistry as specified by the prometheus-client docs: https://github.com/prometheus/client_python/blob/master/README.md#multiprocess-mode-eg-gunicorn
I also notice that the documentation explicitly says to not overwrite PROMETHEUS_MULTIPROC_DIR which is being done by the middleware. It seems to be working for me locally, I wonder if it could be a problem?
@s3rius Yes that makes since. In fact, your comment made me realize my application does have some metrics in another registry which don't make sense to collect/expose from the primary application. I now need to collect/expose them from the workers.
I tried to make the change, however, I quickly ran into the same issue of getting the Duplicated timeseries in CollectorRegistry error when the worker application is starting up. I tried all sorts of things like but couldn't quite figure out how to get it to work.
I tried to register the "application" registry with prometheus_client.multiprocessing in the application, inside the middleware, and I even tried with a custom http server that registers it in each request like the prometheus client docs recommend. I could never get it to work quite right -- either the "duplicated timeseries" error or getting duplicate metrics in the output.
Also, this solution relies on the fact that taskiq uses multiprocessing to run multiple workers by default. However, it possible that a caller of the library will be running with --workers 1 cli flag. The multiprocessing limits the flexibility of the prometheus client as specified in their docs. What should be done in this case?
Can you please show me which metrics does your application export along with taskiq's metrics?
Because I can run metrics without any issue with custom counter metric.
import asyncio
from taskiq import PrometheusMiddleware
from prometheus_client import Counter
from taskiq_redis import ListQueueBroker
my_custom_counter = Counter("my_custom_ccc", "Made to test #176")
broker = ListQueueBroker("redis://localhost").with_middlewares(PrometheusMiddleware())
@broker.task()
async def mytask(val: int):
my_custom_counter.inc(val)
async def main():
await broker.startup()
for i in range(10):
await mytask.kiq(i)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
taskiq worker br:broker -w 4
python br.py
This code runs fine, even with multiple workers.
@s3rius Interesting, I was able to do some more debugging with your example. I was able to run your example just fine. What I noticed however, is that it did happen when I ran taskiq worker with --fs-discover flag.
I think what is happening is my tasks.py file was importing the broker and this caused some sort of circular dependency registering the metrics? What I tried was to avoid creating the PrometheusMiddleware in the global scope. I instead added it in the startup callback:
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
sentry.init_sentry()
# broker.add_middlewares([PrometheusMiddleware()])
prom_middleware = PrometheusMiddleware()
prom_middleware.set_broker(broker)
broker.middlewares.append(prom_middleware)
This worked! As you can see, I tried adding it with a call to add_middlewares but I am always getting this error. I'm not sure why this is the case since in the middleware's __init__ I do the same if check and it will return True.. will need to look into it more.
In any case, I am happy to update my PR to allow a registry to pass through __init__. The only problem I am having is what to do about the imports. I see that the imports are checked in __init__, however, I need a couple in the global namespace to get a proper type and default parameter for the metrics registry.
I have pushed my change and left a comment in the part I'm not so sure about. Thanks for the help!
I have another question. Do you use another brokers? Maybe you start multiple brokers and they all try to add same metrics?
No I only have one broker.
# src.nebula.broker
from uuid import uuid4
import os
from taskiq import InMemoryBroker, TaskiqEvents, TaskiqState, PrometheusMiddleware
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from nebula.config import REDIS_URL
from nebula.utils import sentry
from nebula.utils.taskiq_middleware import SentryException
three_days_in_seconds = 3 * 24 * 60 * 60
redis_async_result = RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=three_days_in_seconds)
def task_id_generator() -> str:
# TODO: interpolate with function call to taskiq default id generator
# (the implementations are currently the same)
# https://github.com/taskiq-python/taskiq/blob/cd4104fdfc4353aa6d9e0faafc4bae4a7afa3c09/taskiq/abc/broker.py#L50
return f"nebula_tasks:{uuid4().hex}"
broker = (
ListQueueBroker(url=REDIS_URL, queue_name="nebula_tasks")
.with_result_backend(redis_async_result)
.with_id_generator(task_id_generator)
.with_middlewares(SentryException())
)
env = os.environ.get("NEBULA_ENVIRONMENT")
if env and env == "test":
broker = InMemoryBroker()
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
sentry.init_sentry()
# broker.add_middlewares([PrometheusMiddleware()])
prom_middleware = PrometheusMiddleware()
prom_middleware.set_broker(broker)
broker.middlewares.append(prom_middleware)
# src.nebula.core.tasks
from nebula.broker import broker
from nebula.core import lics
@broker.task
async def process_live_stream(replay_ready):
return await lics.process_live_stream(replay_ready)
@broker.task
async def answer_question(suggest_auto_response):
return await lics.answer_question(suggest_auto_response)
and I am running the broker with pipenv run taskiq worker --fs-discover --workers 2 src.nebula.broker:broker
@nickdichev-firework, sorry, I was on vacation. So, I ran your example without an issue. I just removed Sentry middleware.
import os
from uuid import uuid4
from prometheus_client import Gauge
from taskiq import InMemoryBroker, PrometheusMiddleware
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
my_gauge = Gauge("a", "test")
three_days_in_seconds = 3 * 24 * 60 * 60
redis_async_result = RedisAsyncResultBackend(
redis_url="redis://localhost",
result_ex_time=three_days_in_seconds,
)
def task_id_generator() -> str:
# TODO: interpolate with function call to taskiq default id generator
# (the implementations are currently the same)
# https://github.com/taskiq-python/taskiq/blob/cd4104fdfc4353aa6d9e0faafc4bae4a7afa3c09/taskiq/abc/broker.py#L50
return f"nebula_tasks:{uuid4().hex}"
broker = (
ListQueueBroker(url="redis://localhost", queue_name="nebula_tasks")
.with_result_backend(redis_async_result)
.with_id_generator(task_id_generator)
.with_middlewares(PrometheusMiddleware())
)
env = os.environ.get("NEBULA_ENVIRONMENT")
if env and env == "test":
broker = InMemoryBroker()
I modified it a little bit, but it does the same thing. Can you please try to create a new project and run this file with taskiq worker?