numaflow-python
numaflow-python copied to clipboard
feat: Added servicer for MapStreamBatchFn
Adds python servicer to address https://github.com/numaproj/numaflow/issues/1688.
The main 'issue' with this implementation is that since there are now two functions the servicer needs to be aware of, the __invoke*
function can't rely just on __call__
to proxy, so it does NOT support the pure function-based callback at this time.. I'm not sure how the core team would like to address that.
I will make an examples
for this, but my current setup was a bit weird and didn't' immediately build with the same structure, so I'll update that shortly. But below is the code and pipeline use to test, in coordination with updates in https://github.com/numaproj/numaflow-go/pull/129 & https://github.com/numaproj/numaflow/pull/1707
import os
import time
import json
from collections.abc import AsyncIterable
from pynumaflow.mapstreamer import Message, Datum, MapStreamAsyncServer, MapStreamer
SLEEP_TIME = int(os.environ.get("SLEEP_TIME_SEC", "1"))
class MapperStreamer(MapStreamer):
async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
pass
async def handler_stream(self, datum: AsyncIterable[Datum]) -> AsyncIterable[Message]:
"""
A handler to iterate over each item in stream and will output message for each item.
For example, indicates even, odd, or DROP if 0.
This will sleep a very short time to simulate longer processing so that we can see
messages actually backing up and getting fetched and processed in batches
"""
all_grouped = []
# Treat each message individually, because we can
print(f"Simulate doing work for {SLEEP_TIME} sec")
time.sleep(SLEEP_TIME)
async for msg in datum:
parsed = json.loads(msg.value.decode())
val = hash(parsed["Data"]["padding"])
as_str = str(val)
all_grouped.append(val)
print(f"Computed message value = {as_str}")
last_int = int(as_str[-1])
if last_int == 0:
print(f"Drop {as_str}")
yield Message.to_drop()
continue
if last_int % 2 == 0:
output_keys = ["even"]
output_tags = ["even-tag"]
else:
output_keys = ["odd"]
output_tags = ["odd-tag"]
yield Message(value=as_str.encode("utf-8"), keys=output_keys, tags=output_tags)
# Show that we can do a messages separate from each individual one.
# This demonstrates 'grouping' messages into fewer, but larger ,messages
grouped_val = json.dumps(all_grouped).encode("utf-8")
yield Message(value=grouped_val, tags=["grouped"])
if __name__ == "__main__":
# NOTE: stream handler does currently support function-only handler
handler = MapperStreamer()
grpc_server = MapStreamAsyncServer(handler)
grpc_server.start()
Pipeline.yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-pipeline
spec:
vertices:
- name: in
scale:
disabled: true
source:
generator:
# How many messages to generate in the duration.
rpu: 10
duration: 10s
# Optional, size of each generated message, defaults to 10.
msgSize: 10
- name: even-or-odd
metadata:
annotations:
numaflow.numaproj.io/map-stream: "true"
limits:
readBatchSize: 20
scale:
disabled: true
# limits:
# readBatchSize: 1
udf:
container:
imagePullPolicy: Never
image: docker.io/warner/demo-app:latest
command: ["python"]
args:
- -m
- demo_pipeline.stream_wait_udf
env:
- name: MAX_THREADS
value: "1"
- name: PYTHONUNBUFFERED
value: "1"
- name: SLEEP_TIME_SEC
value: "10"
- name: even-sink
scale:
min: 1
sink:
# A simple log printing sink
log: {}
- name: odd-sink
scale:
min: 1
sink:
log: {}
- name: number-sink
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: even-or-odd
- from: even-or-odd
to: even-sink
conditions:
tags:
values:
- even-tag
- from: even-or-odd
to: odd-sink
conditions:
tags:
operator: or
values:
- odd-tag
- from: even-or-odd
to: number-sink
conditions:
tags:
values:
- grouped
Resulting pipeline looks like this as it runs