esper
esper copied to clipboard
In DataFlow threaded execution variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null
Variables passed to UDFs in event filters evaluate to null if the event originates from a dataflow operator.
create schema Empty ();
create dataflow InputOutput
BeaconSource -> events<Empty> {}
EventBusSink(events) {};
create variable String X = String.valueOf(1);
select * from Empty(AssertNotNull(X));
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPRuntimeProvider;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
public class Main {
public static boolean AssertNotNull(Object v) throws Exception {
if (v == null) {
throw new Exception("assertion failure: got null value");
}
return true;
}
public static void main(String[] args) throws Exception {
// read epl
var epl = Files.readString(Path.of("query.epl"), StandardCharsets.US_ASCII);
// setup configuration
var configuration = new Configuration();
configuration.getCompiler().addPlugInSingleRowFunction("AssertNotNull", Main.class.getCanonicalName(), "AssertNotNull");
// compile it
var compiler = EPCompilerProvider.getCompiler();
var arguments = new CompilerArguments(configuration);
var module = compiler.parseModule(epl);
var compiled = compiler.compile(module, arguments);
// deploy it
var epRuntime = EPRuntimeProvider.getRuntime("", configuration);
epRuntime.getDeploymentService().deploy(compiled);
// start all dataflows
var epDataFlow = epRuntime.getDataFlowService();
var instances = new ArrayList<EPDataFlowInstance>();
for (var dataflow: epDataFlow.getDataFlows()) {
var instance = epDataFlow.instantiate(dataflow.getDeploymentId(), dataflow.getName());
instances.add(instance);
instance.start();
}
// wait for dataflows to complete
for (var instance : instances) {
instance.join();
}
}
}
Use ConfigurationCompilerPlugInSingleRowFunction.FilterOptimizable.DISABLED
For reference:
String epl = "@name('schema') @public @buseventtype create schema Empty ();\n" + "create dataflow InputOutput\n" + " BeaconSource -> events{}\n" + " EventBusSink(events) {};\n" + "create variable String X = String.valueOf(1);\n" + "@name('s0') select * from Empty(localAssertNotNull(X));\n"; env.compileDeploy(epl).addListener("s0"); EPDataFlowInstance instance = env.runtime().getDataFlowService().instantiate(env.deploymentId("schema"), "InputOutput"); instance.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } env.assertListenerInvoked("s0"); instance.cancel(); env.undeployAll();
Could however improve to detect the variable or provide an exception
If a constant is used instead of a variable, null is not passed.
@bernhardttom btw, I already have ConfigurationCompilerPlugInSingleRowFunction.ValueCache.DISABLED
set.