nuclio
nuclio copied to clipboard
MaxWorkers setting for rabbit-mq trigger not working as expected
My function seems to only ever have one worker.
Here is my function.yaml
spec:
description: rabbit_test
runtime: "python:3.6"
handler: main:handler
eventTimeout: 30s
env:
- name: PYTHONPATH
value: /usr/bin/python3
build:
baseImage: nuclio
triggers:
Infer:
maxWorkers: 5
kind: "rabbit-mq"
url: "amqp://user:password@rabbitmq"
attributes:
exchangeName: ""
queueName: "hello"
platform:
attributes:
restartPolicy:
name: always
maximumRetryCount: 3
network: nuclio_app-tier
My Dockerfile
ARG NUCLIO_LABEL=1.5.0
ARG NUCLIO_ARCH=amd64
ARG NUCLIO_BASE_IMAGE=ubuntu:18.04
ARG NUCLIO_ONBUILD_IMAGE=quay.io/nuclio/handler-builder-python-onbuild:${NUCLIO_LABEL}-${NUCLIO_ARCH}
# Supplies processor uhttpc, used for healthcheck
FROM nuclio/uhttpc:0.0.1-amd64 as uhttpc
# Supplies processor binary, wrapper
FROM ${NUCLIO_ONBUILD_IMAGE} as processor
# From the base image
FROM ${NUCLIO_BASE_IMAGE}
# Readiness probe
HEALTHCHECK --interval=1s --timeout=3s CMD /usr/local/bin/uhttpc --url http://127.0.0.1:8082/ready || exit 1
# USER CONTENT
WORKDIR /opt/nuclio
RUN apt-get update && apt-get install -y curl apt-utils \
libgirepository1.0-dev libcairo2-dev gcc libpq-dev libssl-dev \
python3-pip python3.7-dev python3.7 python3.7-venv \
git python3-setuptools
RUN ln -sfn /usr/bin/python3.7 /usr/bin/python3
RUN ln -s /usr/bin/pip3 /usr/bin/pip
RUN python3 -m pip install --user --upgrade pip
RUN pip install pyyaml pika
RUN pip install --upgrade keyrings.alt
RUN pip install -r requirements.txt
COPY . /opt/nuclio
COPY ./function.yaml /etc/nuclio/config/processor/processor.yaml
# END OF USER CONTENT
# Copy required objects from the suppliers
COPY --from=processor /home/nuclio/bin/processor /usr/local/bin/processor
COPY --from=processor /home/nuclio/bin/py /opt/nuclio/
COPY --from=uhttpc /home/nuclio/bin/uhttpc /usr/local/bin/uhttpc
RUN pip install nuclio-sdk msgpack --no-index --find-links /opt/nuclio/whl
# Run processor with configuration and platform configuration
CMD [ "processor" ]
I build the docker image and the run the following command to deploy.
nuctl deploy rabbit_test --run-image nuclio --path . --platform local
In addition to the maxWorker issues, I would like to pass some runtime arguments to the Docker coainter when deploying the function. Is this possible to do?
I can do it if follow the steps in this documentation https://nuclio.io/docs/latest/reference/runtimes/python/python-reference/. The issue is, after starting the container with this command docker run --name my-function --rm -d -p 8090:8080 my-function:latest
when I run nuctl get function --platform local
my function is not shown.
Hey @bigred94 , you may omit the
COPY . /opt/nuclio
COPY ./function.yaml /etc/nuclio/config/processor/processor.yaml
part from your docker, to allow nuclio safely volumize the processor.yaml and to copy user contens/iles to opt/nuclio
.
also, keep in mind, most of the run commands (pip install and etc) can be achieved via UI / function.yaml so you dont really have to specify that over a docker file.
regarding rabbit <> max workers, can you attach the initialization logs of your function? I would like to confirm it A. read your configuration successfully B. starts workers successfully
nuctl register your function in a local storage as a local "db", and since you run it manually your function is not listed there.
@liranbg Thanks for the response.
Here are my initialization logs
Read configuration {"config": {"metadata":{"name":"rabbit_test","namespace":"adam-test","annotations":{"framework":"pytorch","name":"rabbit_test","spec":"[\n { \"id\": 0, \"name\": \"person\" },\n { \"id\": 1, \"name\": \"car\" },\n { \"id\": 2, \"name\": \"motorcycle\" },\n { \"id\": 3, \"name\": \"bus\" },\n { \"id\": 4, \"name\": \"truck\" }\n]\n","type":"detector"}},"spec":{"description":"rabbit_test","handler":"main:handler","runtime":"python:3.7","env":[{"name":"PYTHONPATH","value":"/usr/bin/python3"}],"resources":{},"triggers":{"train":{"class":"","kind":"rabbit-mq","maxWorkers":50,"url":"amqp://user:password@rabbitmq","attributes":{"exchangeName":"","queueName":"hello"}}},"build":{},"platform":{"attributes":{"network":"nuclio_app-tier","restartPolicy":{"maximumRetryCount":3,"name":"always"}}},"eventTimeout":"30s"},"PlatformConfig":null}, "platformConfig": {"kind":"local","webAdmin":{"enabled":true,"listenAddress":":8081"},"healthCheck":{"enabled":true,"listenAddress":":8082"},"logger":{"sinks":{"stdout":{"kind":"stdout"}},"system":[{"level":"debug","sink":"stdout"}],"functions":[{"level":"debug","sink":"stdout"}]},"metrics":{},"scaleToZero":{},"autoScale":{}}}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Creating listener socket {"path": "/tmp/nuclio-rpc-bu3la8rnaq8000d6tgrg.sock"}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Using Python wrapper script path {"path": "/opt/nuclio/_nuclio_wrapper.py"}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Using Python handler {"handler": "main:handler"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Using Python executable {"path": "/usr/bin/python3"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Setting PYTHONPATH {"value": "PYTHONPATH=/opt/nuclio"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Running wrapper {"command": "/usr/bin/python3 -u /opt/nuclio/_nuclio_wrapper.py --handler main:handler --socket-path /tmp/nuclio-rpc-bu3la8rnaq8000d6tgrg.sock --platform-kind local --namespace adam-test --worker-id 0 --trigger-name train"}
20.10.14 19:44:36.986 abbit_mq.w0.python.logger (I) Wrapper connected
Python> 2020-10-14 19:44:36,986 [info] Replacing logger output
20.10.14 19:44:36.986 abbit_mq.w0.python.logger (D) Waiting for start
INFO:nuclio_sdk:Replacing logger output
INFO:nuclio_sdk:Init context... 0%
20.10.14 19:44:36.987 abbit_mq.w0.python.logger (I) Init context... 0%
INFO:root:Files downloaded from bucket: ['models/5eebcb318492baedbf4eaa50-net0.pth']
INFO:root:Files retrieved from cache: []
INFO:root:{'net': 'DLASeg', 'net_kwargs': {'n_classes': 5, 'use_deform_conv': False}, 'pil_transform': 'ResizePreserveAspect', 'pil_transform_kwargs': {'new_h': 512, 'new_w': 512}, 'torch_transform': 'PixelTransform', 'torch_transform_kwargs': {'mean': [0.47, 0.447, 0.408], 'std': [0.278, 0.274, 0.289], 'bgr': True}, 'class_strs': ['person', 'car', 'truck', 'bus', 'motorcycle'], 'bucket_name': 'LOCAL.cache', 'net_paths': ['models/5eebcb318492baedbf4eaa50-net0.pth']}
INFO:root:Loading from bucket LOCAL.cache
INFO:root:Loading DLASeg from /tmp/tmppygdevpm
INFO:nuclio_sdk:Init context...100%
20.10.14 19:44:40.097 abbit_mq.w0.python.logger (I) Init context...100%
20.10.14 19:44:40.097 abbit_mq.w0.python.logger (D) Started
20.10.14 19:44:40.097 processor (D) Creating default HTTP event source {"configuration": {"class":"sync","kind":"http","maxWorkers":1,"url":":8080","workerAllocatorName":"defaultHTTPWorkerAllocator"}}
20.10.14 19:44:40.097 processor.http (D) Creating worker pool {"num": 1}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Creating listener socket {"path": "/tmp/nuclio-rpc-bu3laa3naq8000d6tgs0.sock"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python wrapper script path {"path": "/opt/nuclio/_nuclio_wrapper.py"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python handler {"handler": "main:handler"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python executable {"path": "/usr/bin/python3"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Setting PYTHONPATH {"value": "PYTHONPATH=/opt/nuclio"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Running wrapper {"command": "/usr/bin/python3 -u /opt/nuclio/_nuclio_wrapper.py --handler main:handler --socket-path /tmp/nuclio-rpc-bu3laa3naq8000d6tgs0.sock --platform-kind local --namespace adam-test --worker-id 0 --trigger-name defaultHTTPWorkerAllocator"}
20.10.14 19:44:41.237 sor.http.w0.python.logger (I) Wrapper connected
20.10.14 19:44:41.237 sor.http.w0.python.logger (D) Waiting for start
Python> 2020-10-14 19:44:41,237 [info] Replacing logger output
INFO:nuclio_sdk:Replacing logger output
20.10.14 19:44:41.237 sor.http.w0.python.logger (I) Init context... 0%
INFO:nuclio_sdk:Init context... 0%
INFO:root:Files downloaded from bucket: []
INFO:root:Files retrieved from cache: ['models/5eebcb318492baedbf4eaa50-net0.pth']
INFO:root:{'net': 'DLASeg', 'net_kwargs': {'n_classes': 5, 'use_deform_conv': False}, 'pil_transform': 'ResizePreserveAspect', 'pil_transform_kwargs': {'new_h': 512, 'new_w': 512}, 'torch_transform': 'PixelTransform', 'torch_transform_kwargs': {'mean': [0.47, 0.447, 0.408], 'std': [0.278, 0.274, 0.289], 'bgr': True}, 'class_strs': ['person', 'car', 'truck', 'bus', 'motorcycle'], 'bucket_name': 'LOCAL.cache', 'net_paths': ['models/5eebcb318492baedbf4eaa50-net0.pth']}
INFO:root:Loading from bucket LOCAL.cache
INFO:root:Loading DLASeg from /tmp/tmp9oriv6um
INFO:nuclio_sdk:Init context...100%
20.10.14 19:44:41.940 sor.http.w0.python.logger (I) Init context...100%
20.10.14 19:44:41.940 sor.http.w0.python.logger (D) Started
20.10.14 19:44:41.940 processor (I) Starting event timeout watcher {"timeout": "30s"}
20.10.14 19:44:41.940 .webadmin.server.triggers (D) Registered custom route {"pattern": "/{id}/stats", "method": "GET"}
20.10.14 19:44:41.940 processor.webadmin.server (D) Registered resource {"name": "triggers"}
20.10.14 19:44:41.940 processor (W) No metric sinks configured, metrics will not be published
20.10.14 19:44:41.941 processor (D) Starting
20.10.14 19:44:41.941 cessor.healthcheck.server (I) Listening {"listenAddress": ":8082"}
20.10.14 19:44:41.941 processor.rabbit_mq.train (I) Starting {"brokerUrl": "amqp://user:password@rabbitmq"}
20.10.14 19:44:41.941 processor.rabbit_mq.train (I) Creating broker resources {"brokerUrl": "amqp://user:password@rabbitmq", "exchangeName": "", "queueName": "hello", "topics": []}
20.10.14 19:44:41.947 processor.rabbit_mq.train (D) Connected to broker {"brokerUrl": "amqp://user:password@rabbitmq"}
20.10.14 19:44:41.949 processor.rabbit_mq.train (D) Created broker channel
20.10.14 19:44:41.949 processor.rabbit_mq.train (D) Starting consumption from queue {"queueName": "hello"}
20.10.14 19:44:41.949 processor.http (I) Starting {"listenAddress": ":8080", "readBufferSize": 4096}
20.10.14 19:44:41.949 processor.webadmin.server (I) Listening {"listenAddress": ":8081"}
20.10.14 19:45:11.906 processor.timeout (D) Checking trigger workers {"triggerName": 1}
20.10.14 19:45:11.906 processor.timeout (D) Checking worker {"triggerName": 1, "widx": 0}
20.10.14 19:45:11.906 processor.timeout (D) Checking trigger workers {"triggerName": 0}
20.10.14 19:45:11.906 processor.timeout (D) Checking worker {"triggerName": 0, "widx": 0}
20.10.14 19:45:41.873 processor.timeout (D) Checking trigger workers {"triggerName": 1}
20.10.14 19:45:41.873 processor.timeout (D) Checking worker {"triggerName": 1, "widx": 0}
20.10.14 19:45:41.873 processor.timeout (D) Checking trigger workers {"triggerName": 0}
20.10.14 19:45:41.873 processor.timeout (D) Checking worker {"triggerName": 0, "widx": 0}
20.10.14 19:46:10.694 abbit_mq.w0.python.logger (D) Processing event {"name": "rabbit_test", "version": 0, "eventID": "0add9039-1d1f-4f6d-8094-d71704322cd0"}
20.10.14 19:46:10.694 abbit_mq.w0.python.logger (D) Sending event to wrapper {"size": 2267637}
INFO:nuclio_sdk:Event object info: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'decode_body', 'fields', 'from_json', 'get_header', 'headers', 'id', 'method', 'path', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
20.10.14 19:46:10.707 abbit_mq.w0.python.logger (I) Event object info: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'decode_body', 'fields', 'from_json', 'get_header', 'headers', 'id', 'method', 'path', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
INFO:nuclio_sdk:Trigger info kind: rabbitMq
20.10.14 19:46:10.710 abbit_mq.w0.python.logger (I) Trigger info kind: rabbitMq
INFO:nuclio_sdk:Trigger info klass: async
In regards to the function registering, is there a way to manually register it after manually running the function? If not, is there a way to deploy the function using nuctl and either pass arguments to init_context or pass run arguments to the docker container that is created by nuctl?
Nuctl would do the registration for you, it is preferably than do it manually because these scenario are not tested and I wouldnt recommend it. not to mention you lose most of nuclio features / benefits when running in different platform (kube vs platform).
I would recommend using nuctl
while
- build your image (
nuctl build --path <dir> --image <image-name>
) - deploy function using the previously built image (
nuctl deploy my-awesome-function --run-image <image-name> --file <function.yaml>
)
Now, when you deploy your function, you can add env
to your function.yaml and parse it on init_context
and use it as "run arguments"
e.g.:
def handler(context, event):
db.connect(context.user_data.db_url)
...
def init_context(context):
setattr(context.user_data, 'db_url', os.environ.get('DB_URL'))
@liranbg Thanks for help. Passing environment variables and setting the context will work just fine! Any more insight on how to get multiple workers for the rabbitmq trigger?