hazelcast-jet
hazelcast-jet copied to clipboard
NullPointerException in WriteBufferedP.init() when using ProcessorWrapper
Hi,
when using ProcessorWrapper and eg. list sink, Following exception is thrown:
Caused by: java.lang.NullPointerException
at com.hazelcast.jet.impl.connector.WriteBufferedP.init(WriteBufferedP.java:67)
at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
at com.hazelcast.jet.impl.execution.ProcessorTasklet.init(ProcessorTasklet.java:228)
at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:289)
... 5 more
Reproduction code:
class ReproTest {
static JetInstance jet = Jet.newJetInstance();
static IMap<String, Integer> inputMap;
static IList<Integer> outputList;
@BeforeAll
static void setUp() {
inputMap = jet.getMap("testInputMap");
outputList = jet.getList("outputList");
}
@Test
void reproduction() {
// given
for (int i = 0; i < 10; i++) {
inputMap.put("" + i, i);
}
var pipeline = Pipeline.create();
pipeline.readFrom(Sources.map(inputMap))
.map(e -> e.getValue()).setLocalParallelism(4)
.writeTo(Sinks.list(outputList));
DAG dag = pipeline.toDag();
for (Vertex vertex : dag) {
vertex.updateMetaSupplier(ps -> new WrappingProcessorMetaSupplier(ps,
proc -> new TestWrapper(proc)));
}
// when
Job job = jet.newJob(dag);
job.join();
// then
var list = List.copyOf(outputList);
assertThat(list).hasSize(inputMap.size());
}
static class TestWrapper extends ProcessorWrapper {
TestWrapper(Processor proc) {
super(proc);
}
@Override
public void process(int ordinal, Inbox inbox) {
getWrapped().process(ordinal, inbox);
}
}
}
What's the Jet version?
Jet 4.0, Java 13
This seems something related to ManagedContext, but the test didn't reproduce the error for me on Java 8.