esper icon indicating copy to clipboard operation
esper copied to clipboard

In DataFlow threaded execution variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null

Open icholy opened this issue 4 years ago • 4 comments

Variables passed to UDFs in event filters evaluate to null if the event originates from a dataflow operator.

create schema Empty ();

create dataflow InputOutput
  BeaconSource -> events<Empty> {}
  EventBusSink(events) {};

create variable String X = String.valueOf(1);

select * from Empty(AssertNotNull(X));
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPRuntimeProvider;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

public class Main {

    public static boolean AssertNotNull(Object v) throws Exception {
        if (v == null) {
            throw new Exception("assertion failure: got null value");
        }
        return true;
    }

    public static void main(String[] args) throws Exception {

        // read epl
        var epl = Files.readString(Path.of("query.epl"), StandardCharsets.US_ASCII);

        // setup configuration
        var configuration = new Configuration();
        configuration.getCompiler().addPlugInSingleRowFunction("AssertNotNull", Main.class.getCanonicalName(), "AssertNotNull");

        // compile it
        var compiler = EPCompilerProvider.getCompiler();
        var arguments = new CompilerArguments(configuration);
        var module = compiler.parseModule(epl);
        var compiled = compiler.compile(module, arguments);

        // deploy it
        var epRuntime = EPRuntimeProvider.getRuntime("", configuration);
        epRuntime.getDeploymentService().deploy(compiled);

        // start all dataflows
        var epDataFlow = epRuntime.getDataFlowService();
        var instances = new ArrayList<EPDataFlowInstance>();
        for (var dataflow: epDataFlow.getDataFlows()) {
            var instance = epDataFlow.instantiate(dataflow.getDeploymentId(), dataflow.getName());
            instances.add(instance);
            instance.start();
        }

        // wait for dataflows to complete
        for (var instance : instances) {
            instance.join();
        }
    }
}

icholy avatar Dec 03 '20 17:12 icholy

Use ConfigurationCompilerPlugInSingleRowFunction.FilterOptimizable.DISABLED

For reference:

            
String epl = "@name('schema') @public @buseventtype create schema Empty ();\n" +
                    "create dataflow InputOutput\n" +
                    "  BeaconSource -> events {}\n" +
                    "  EventBusSink(events) {};\n" +
                    "create variable String X = String.valueOf(1);\n" +
                    "@name('s0') select * from Empty(localAssertNotNull(X));\n";
            env.compileDeploy(epl).addListener("s0");

            EPDataFlowInstance instance = env.runtime().getDataFlowService().instantiate(env.deploymentId("schema"), "InputOutput");
            instance.start();

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            env.assertListenerInvoked("s0");

            instance.cancel();

            env.undeployAll();

bernhardttom avatar Jan 22 '21 17:01 bernhardttom

Could however improve to detect the variable or provide an exception

bernhardttom avatar Jan 22 '21 17:01 bernhardttom

If a constant is used instead of a variable, null is not passed.

icholy avatar Jan 25 '21 16:01 icholy

@bernhardttom btw, I already have ConfigurationCompilerPlugInSingleRowFunction.ValueCache.DISABLED set.

icholy avatar Jan 25 '21 17:01 icholy