nuclio icon indicating copy to clipboard operation
nuclio copied to clipboard

MaxWorkers setting for rabbit-mq trigger not working as expected

Open bigred94 opened this issue 4 years ago • 4 comments

My function seems to only ever have one worker.

Here is my function.yaml

spec:
  description: rabbit_test
  runtime: "python:3.6"
  handler: main:handler
  eventTimeout: 30s
  env:
    - name: PYTHONPATH
      value: /usr/bin/python3

  build:
    baseImage: nuclio

  triggers:
    Infer:
      maxWorkers: 5
      kind: "rabbit-mq"
      url: "amqp://user:password@rabbitmq"
      attributes:
        exchangeName: ""
        queueName: "hello"


  platform:
    attributes:
      restartPolicy:
        name: always
        maximumRetryCount: 3
      network: nuclio_app-tier

My Dockerfile

ARG NUCLIO_LABEL=1.5.0
ARG NUCLIO_ARCH=amd64
ARG NUCLIO_BASE_IMAGE=ubuntu:18.04
ARG NUCLIO_ONBUILD_IMAGE=quay.io/nuclio/handler-builder-python-onbuild:${NUCLIO_LABEL}-${NUCLIO_ARCH}

# Supplies processor uhttpc, used for healthcheck
FROM nuclio/uhttpc:0.0.1-amd64 as uhttpc

# Supplies processor binary, wrapper
FROM ${NUCLIO_ONBUILD_IMAGE} as processor

# From the base image
FROM ${NUCLIO_BASE_IMAGE}


# Readiness probe
HEALTHCHECK --interval=1s --timeout=3s CMD /usr/local/bin/uhttpc --url http://127.0.0.1:8082/ready || exit 1

# USER CONTENT
WORKDIR /opt/nuclio

RUN apt-get update && apt-get install -y curl apt-utils \
    libgirepository1.0-dev libcairo2-dev gcc libpq-dev libssl-dev \
    python3-pip python3.7-dev python3.7 python3.7-venv \
    git python3-setuptools

RUN ln -sfn /usr/bin/python3.7 /usr/bin/python3
RUN ln -s /usr/bin/pip3 /usr/bin/pip
RUN python3 -m pip install --user --upgrade pip
RUN pip install pyyaml pika
RUN pip install --upgrade keyrings.alt
RUN pip install -r requirements.txt

COPY . /opt/nuclio
COPY ./function.yaml /etc/nuclio/config/processor/processor.yaml

# END OF USER CONTENT

# Copy required objects from the suppliers
COPY --from=processor /home/nuclio/bin/processor /usr/local/bin/processor
COPY --from=processor /home/nuclio/bin/py /opt/nuclio/
COPY --from=uhttpc /home/nuclio/bin/uhttpc /usr/local/bin/uhttpc
RUN pip install nuclio-sdk msgpack --no-index --find-links /opt/nuclio/whl

# Run processor with configuration and platform configuration
CMD [ "processor" ]

I build the docker image and the run the following command to deploy. nuctl deploy rabbit_test --run-image nuclio --path . --platform local

In addition to the maxWorker issues, I would like to pass some runtime arguments to the Docker coainter when deploying the function. Is this possible to do?

I can do it if follow the steps in this documentation https://nuclio.io/docs/latest/reference/runtimes/python/python-reference/. The issue is, after starting the container with this command docker run --name my-function --rm -d -p 8090:8080 my-function:latest when I run nuctl get function --platform local my function is not shown.

bigred94 avatar Oct 14 '20 21:10 bigred94

Hey @bigred94 , you may omit the

COPY . /opt/nuclio
COPY ./function.yaml /etc/nuclio/config/processor/processor.yaml

part from your docker, to allow nuclio safely volumize the processor.yaml and to copy user contens/iles to opt/nuclio.

also, keep in mind, most of the run commands (pip install and etc) can be achieved via UI / function.yaml so you dont really have to specify that over a docker file.

regarding rabbit <> max workers, can you attach the initialization logs of your function? I would like to confirm it A. read your configuration successfully B. starts workers successfully

nuctl register your function in a local storage as a local "db", and since you run it manually your function is not listed there.

liranbg avatar Oct 15 '20 21:10 liranbg

@liranbg Thanks for the response.

Here are my initialization logs

Read configuration {"config": {"metadata":{"name":"rabbit_test","namespace":"adam-test","annotations":{"framework":"pytorch","name":"rabbit_test","spec":"[\n  { \"id\": 0, \"name\": \"person\" },\n  { \"id\": 1, \"name\": \"car\" },\n  { \"id\": 2, \"name\": \"motorcycle\" },\n  { \"id\": 3, \"name\": \"bus\" },\n  { \"id\": 4, \"name\": \"truck\" }\n]\n","type":"detector"}},"spec":{"description":"rabbit_test","handler":"main:handler","runtime":"python:3.7","env":[{"name":"PYTHONPATH","value":"/usr/bin/python3"}],"resources":{},"triggers":{"train":{"class":"","kind":"rabbit-mq","maxWorkers":50,"url":"amqp://user:password@rabbitmq","attributes":{"exchangeName":"","queueName":"hello"}}},"build":{},"platform":{"attributes":{"network":"nuclio_app-tier","restartPolicy":{"maximumRetryCount":3,"name":"always"}}},"eventTimeout":"30s"},"PlatformConfig":null}, "platformConfig": {"kind":"local","webAdmin":{"enabled":true,"listenAddress":":8081"},"healthCheck":{"enabled":true,"listenAddress":":8082"},"logger":{"sinks":{"stdout":{"kind":"stdout"}},"system":[{"level":"debug","sink":"stdout"}],"functions":[{"level":"debug","sink":"stdout"}]},"metrics":{},"scaleToZero":{},"autoScale":{}}}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Creating listener socket {"path": "/tmp/nuclio-rpc-bu3la8rnaq8000d6tgrg.sock"}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Using Python wrapper script path {"path": "/opt/nuclio/_nuclio_wrapper.py"}
20.10.14 19:44:35.560 abbit_mq.w0.python.logger (D) Using Python handler {"handler": "main:handler"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Using Python executable {"path": "/usr/bin/python3"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Setting PYTHONPATH {"value": "PYTHONPATH=/opt/nuclio"}
20.10.14 19:44:35.561 abbit_mq.w0.python.logger (D) Running wrapper {"command": "/usr/bin/python3 -u /opt/nuclio/_nuclio_wrapper.py --handler main:handler --socket-path /tmp/nuclio-rpc-bu3la8rnaq8000d6tgrg.sock --platform-kind local --namespace adam-test --worker-id 0 --trigger-name train"}
20.10.14 19:44:36.986 abbit_mq.w0.python.logger (I) Wrapper connected
Python> 2020-10-14 19:44:36,986 [info] Replacing logger output
20.10.14 19:44:36.986 abbit_mq.w0.python.logger (D) Waiting for start
INFO:nuclio_sdk:Replacing logger output
INFO:nuclio_sdk:Init context...  0%
20.10.14 19:44:36.987 abbit_mq.w0.python.logger (I) Init context...  0%
INFO:root:Files downloaded from bucket: ['models/5eebcb318492baedbf4eaa50-net0.pth']
INFO:root:Files retrieved from cache: []
INFO:root:{'net': 'DLASeg', 'net_kwargs': {'n_classes': 5, 'use_deform_conv': False}, 'pil_transform': 'ResizePreserveAspect', 'pil_transform_kwargs': {'new_h': 512, 'new_w': 512}, 'torch_transform': 'PixelTransform', 'torch_transform_kwargs': {'mean': [0.47, 0.447, 0.408], 'std': [0.278, 0.274, 0.289], 'bgr': True}, 'class_strs': ['person', 'car', 'truck', 'bus', 'motorcycle'], 'bucket_name': 'LOCAL.cache', 'net_paths': ['models/5eebcb318492baedbf4eaa50-net0.pth']}
INFO:root:Loading from bucket LOCAL.cache
INFO:root:Loading DLASeg from /tmp/tmppygdevpm
INFO:nuclio_sdk:Init context...100%
20.10.14 19:44:40.097 abbit_mq.w0.python.logger (I) Init context...100%
20.10.14 19:44:40.097 abbit_mq.w0.python.logger (D) Started
20.10.14 19:44:40.097                 processor (D) Creating default HTTP event source {"configuration": {"class":"sync","kind":"http","maxWorkers":1,"url":":8080","workerAllocatorName":"defaultHTTPWorkerAllocator"}}
20.10.14 19:44:40.097            processor.http (D) Creating worker pool {"num": 1}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Creating listener socket {"path": "/tmp/nuclio-rpc-bu3laa3naq8000d6tgs0.sock"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python wrapper script path {"path": "/opt/nuclio/_nuclio_wrapper.py"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python handler {"handler": "main:handler"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Using Python executable {"path": "/usr/bin/python3"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Setting PYTHONPATH {"value": "PYTHONPATH=/opt/nuclio"}
20.10.14 19:44:40.098 sor.http.w0.python.logger (D) Running wrapper {"command": "/usr/bin/python3 -u /opt/nuclio/_nuclio_wrapper.py --handler main:handler --socket-path /tmp/nuclio-rpc-bu3laa3naq8000d6tgs0.sock --platform-kind local --namespace adam-test --worker-id 0 --trigger-name defaultHTTPWorkerAllocator"}
20.10.14 19:44:41.237 sor.http.w0.python.logger (I) Wrapper connected
20.10.14 19:44:41.237 sor.http.w0.python.logger (D) Waiting for start
Python> 2020-10-14 19:44:41,237 [info] Replacing logger output
INFO:nuclio_sdk:Replacing logger output
20.10.14 19:44:41.237 sor.http.w0.python.logger (I) Init context...  0%
INFO:nuclio_sdk:Init context...  0%
INFO:root:Files downloaded from bucket: []
INFO:root:Files retrieved from cache: ['models/5eebcb318492baedbf4eaa50-net0.pth']
INFO:root:{'net': 'DLASeg', 'net_kwargs': {'n_classes': 5, 'use_deform_conv': False}, 'pil_transform': 'ResizePreserveAspect', 'pil_transform_kwargs': {'new_h': 512, 'new_w': 512}, 'torch_transform': 'PixelTransform', 'torch_transform_kwargs': {'mean': [0.47, 0.447, 0.408], 'std': [0.278, 0.274, 0.289], 'bgr': True}, 'class_strs': ['person', 'car', 'truck', 'bus', 'motorcycle'], 'bucket_name': 'LOCAL.cache', 'net_paths': ['models/5eebcb318492baedbf4eaa50-net0.pth']}
INFO:root:Loading from bucket LOCAL.cache
INFO:root:Loading DLASeg from /tmp/tmp9oriv6um
INFO:nuclio_sdk:Init context...100%
20.10.14 19:44:41.940 sor.http.w0.python.logger (I) Init context...100%
20.10.14 19:44:41.940 sor.http.w0.python.logger (D) Started
20.10.14 19:44:41.940                 processor (I) Starting event timeout watcher {"timeout": "30s"}
20.10.14 19:44:41.940 .webadmin.server.triggers (D) Registered custom route {"pattern": "/{id}/stats", "method": "GET"}
20.10.14 19:44:41.940 processor.webadmin.server (D) Registered resource {"name": "triggers"}
20.10.14 19:44:41.940                 processor (W) No metric sinks configured, metrics will not be published
20.10.14 19:44:41.941                 processor (D) Starting
20.10.14 19:44:41.941 cessor.healthcheck.server (I) Listening {"listenAddress": ":8082"}
20.10.14 19:44:41.941 processor.rabbit_mq.train (I) Starting {"brokerUrl": "amqp://user:password@rabbitmq"}
20.10.14 19:44:41.941 processor.rabbit_mq.train (I) Creating broker resources {"brokerUrl": "amqp://user:password@rabbitmq", "exchangeName": "", "queueName": "hello", "topics": []}
20.10.14 19:44:41.947 processor.rabbit_mq.train (D) Connected to broker {"brokerUrl": "amqp://user:password@rabbitmq"}
20.10.14 19:44:41.949 processor.rabbit_mq.train (D) Created broker channel
20.10.14 19:44:41.949 processor.rabbit_mq.train (D) Starting consumption from queue {"queueName": "hello"}
20.10.14 19:44:41.949            processor.http (I) Starting {"listenAddress": ":8080", "readBufferSize": 4096}
20.10.14 19:44:41.949 processor.webadmin.server (I) Listening {"listenAddress": ":8081"}
20.10.14 19:45:11.906         processor.timeout (D) Checking trigger workers {"triggerName": 1}
20.10.14 19:45:11.906         processor.timeout (D) Checking worker {"triggerName": 1, "widx": 0}
20.10.14 19:45:11.906         processor.timeout (D) Checking trigger workers {"triggerName": 0}
20.10.14 19:45:11.906         processor.timeout (D) Checking worker {"triggerName": 0, "widx": 0}
20.10.14 19:45:41.873         processor.timeout (D) Checking trigger workers {"triggerName": 1}
20.10.14 19:45:41.873         processor.timeout (D) Checking worker {"triggerName": 1, "widx": 0}
20.10.14 19:45:41.873         processor.timeout (D) Checking trigger workers {"triggerName": 0}
20.10.14 19:45:41.873         processor.timeout (D) Checking worker {"triggerName": 0, "widx": 0}
20.10.14 19:46:10.694 abbit_mq.w0.python.logger (D) Processing event {"name": "rabbit_test", "version": 0, "eventID": "0add9039-1d1f-4f6d-8094-d71704322cd0"}
20.10.14 19:46:10.694 abbit_mq.w0.python.logger (D) Sending event to wrapper {"size": 2267637}
INFO:nuclio_sdk:Event object info: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'decode_body', 'fields', 'from_json', 'get_header', 'headers', 'id', 'method', 'path', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
20.10.14 19:46:10.707 abbit_mq.w0.python.logger (I) Event object info: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'decode_body', 'fields', 'from_json', 'get_header', 'headers', 'id', 'method', 'path', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
INFO:nuclio_sdk:Trigger info kind: rabbitMq
20.10.14 19:46:10.710 abbit_mq.w0.python.logger (I) Trigger info kind: rabbitMq
INFO:nuclio_sdk:Trigger info klass: async

In regards to the function registering, is there a way to manually register it after manually running the function? If not, is there a way to deploy the function using nuctl and either pass arguments to init_context or pass run arguments to the docker container that is created by nuctl?

bigred94 avatar Oct 15 '20 21:10 bigred94

Nuctl would do the registration for you, it is preferably than do it manually because these scenario are not tested and I wouldnt recommend it. not to mention you lose most of nuclio features / benefits when running in different platform (kube vs platform).

I would recommend using nuctl while

  1. build your image (nuctl build --path <dir> --image <image-name>)
  2. deploy function using the previously built image (nuctl deploy my-awesome-function --run-image <image-name> --file <function.yaml>)

Now, when you deploy your function, you can add env to your function.yaml and parse it on init_context and use it as "run arguments"

e.g.:

def handler(context, event):
    db.connect(context.user_data.db_url)
    ...

def init_context(context):
    setattr(context.user_data, 'db_url', os.environ.get('DB_URL'))

liranbg avatar Oct 17 '20 17:10 liranbg

@liranbg Thanks for help. Passing environment variables and setting the context will work just fine! Any more insight on how to get multiple workers for the rabbitmq trigger?

bigred94 avatar Oct 19 '20 16:10 bigred94