numaflow-java
numaflow-java copied to clipboard
Numaflow reducer doesn't output grouped message right after fixed time window
I have a numaflow v0.10.1 pipeline with a reduce vertex numaflow-java 0.5.5. I am using keyed messages with size one and 2 partitions. After several messages entering the group node I expect when the fixed time window gets expired my vertex to output one message with grouped things, but my method is called after many minutes or only when new messages are coming. If no messages are coming, then things in memory of the vertex are not output.
- imagePullSecrets:
- name: gitlab-registry
name: group-urls
partitions: 2
scale:
min: 1
udf:
container:
args:
- '-cp'
- numaflow-app.jar
- mypackage.GroupUrlReduceFactory
command:
- /usr/bin/java
image: >-
myimage
imagePullPolicy: Always
resources: {}
groupBy:
keyed: true
storage:
emptyDir: {}
window:
fixed:
length: 300s
My group handler looks like this (Ideleted imports and actual code trying to make it anonymous)
package mypackage;
@Slf4j
@NoArgsConstructor
public class GroupUrlReduceFactory extends ReducerFactory<GroupUrlReduceFactory.GroupUrlHandler> {
public static void main(String[] args) throws Exception {
new Server(new GroupUrlReduceFactory()).start();
}
@Override
public GroupUrlHandler createReducer() {
return new GroupUrlHandler();
}
public static class GroupUrlHandler extends Reducer {
@Override
public void addMessage(String[] strings, Datum datum, Metadata metadata) {
}
@Override
public MessageList getOutput(String[] strings, Metadata md) {
}
}
}
The method getOutput(...)
is not invoked from akka actors if new messages are not coming to the vertex. I think this is a bug.
Sometimes getOutput(...)
is invoked after 20 minutes, sometimes it is invoked after 2 hours. Sometimes it is not invoked for a whole day (when new messages are not coming in the vertex)