hazelcast-jet icon indicating copy to clipboard operation
hazelcast-jet copied to clipboard

NullPointerException in WriteBufferedP.init() when using ProcessorWrapper

Open TomaszGaweda opened this issue 4 years ago • 3 comments

Hi,

when using ProcessorWrapper and eg. list sink, Following exception is thrown:

Caused by: java.lang.NullPointerException
	at com.hazelcast.jet.impl.connector.WriteBufferedP.init(WriteBufferedP.java:67)
	at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
	at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
	at com.hazelcast.jet.impl.execution.ProcessorTasklet.init(ProcessorTasklet.java:228)
	at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:289)
	... 5 more

Reproduction code:


class ReproTest {

    static JetInstance jet = Jet.newJetInstance();
    static IMap<String, Integer> inputMap;
    static IList<Integer> outputList;

    @BeforeAll
    static void setUp() {
        inputMap = jet.getMap("testInputMap");
        outputList = jet.getList("outputList");
    }
    
    @Test
    void reproduction() {
        // given
        for (int i = 0; i < 10; i++) {
            inputMap.put("" + i, i);
        }
        var pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map(inputMap))
                .map(e -> e.getValue()).setLocalParallelism(4)
                .writeTo(Sinks.list(outputList));
        DAG dag = pipeline.toDag();
        for (Vertex vertex : dag) {
            vertex.updateMetaSupplier(ps -> new WrappingProcessorMetaSupplier(ps,
                    proc -> new TestWrapper(proc)));
        }

        // when
        Job job = jet.newJob(dag);
        job.join();

        // then
        var list = List.copyOf(outputList);
        assertThat(list).hasSize(inputMap.size());
    }

    static class TestWrapper extends ProcessorWrapper {
        TestWrapper(Processor proc) {
            super(proc);
        }
        @Override
        public void process(int ordinal, Inbox inbox) {
                    getWrapped().process(ordinal, inbox);
        }
    }

}

TomaszGaweda avatar Apr 09 '20 10:04 TomaszGaweda

What's the Jet version?

viliam-durina avatar Apr 09 '20 12:04 viliam-durina

Jet 4.0, Java 13

TomaszGaweda avatar Apr 09 '20 12:04 TomaszGaweda

This seems something related to ManagedContext, but the test didn't reproduce the error for me on Java 8.

cangencer avatar Apr 14 '20 13:04 cangencer