numaflow-java icon indicating copy to clipboard operation
numaflow-java copied to clipboard

Numaflow reducer doesn't output grouped message right after fixed time window

Open borkox opened this issue 1 year ago • 15 comments

I have a numaflow v0.10.1 pipeline with a reduce vertex numaflow-java 0.5.5. I am using keyed messages with size one and 2 partitions. After several messages entering the group node I expect when the fixed time window gets expired my vertex to output one message with grouped things, but my method is called after many minutes or only when new messages are coming. If no messages are coming, then things in memory of the vertex are not output.

    - imagePullSecrets:
        - name: gitlab-registry
      name: group-urls
      partitions: 2
      scale:
        min: 1
      udf:
        container:
          args:
            - '-cp'
            - numaflow-app.jar
            - mypackage.GroupUrlReduceFactory
          command:
            - /usr/bin/java
          image: >-
            myimage
          imagePullPolicy: Always
          resources: {}
        groupBy:
          keyed: true
          storage:
            emptyDir: {}
          window:
            fixed:
              length: 300s

My group handler looks like this (Ideleted imports and actual code trying to make it anonymous)

package mypackage;

@Slf4j
@NoArgsConstructor
public class GroupUrlReduceFactory extends ReducerFactory<GroupUrlReduceFactory.GroupUrlHandler> {

    public static void main(String[] args) throws Exception {
        new Server(new GroupUrlReduceFactory()).start();
    }

    @Override
    public GroupUrlHandler createReducer() {
        return new GroupUrlHandler();
    }

    public static class GroupUrlHandler extends Reducer {

        @Override
        public void addMessage(String[] strings, Datum datum, Metadata metadata) {
        }

        @Override
        public MessageList getOutput(String[] strings, Metadata md) {
        }

    }
}

The method getOutput(...) is not invoked from akka actors if new messages are not coming to the vertex. I think this is a bug.

Sometimes getOutput(...) is invoked after 20 minutes, sometimes it is invoked after 2 hours. Sometimes it is not invoked for a whole day (when new messages are not coming in the vertex)

borkox avatar Nov 07 '23 11:11 borkox