esper icon indicating copy to clipboard operation
esper copied to clipboard

Allow Esper EPL pattern operators inside dataflows

Open juanboubeta opened this issue 5 years ago • 2 comments

Dear all,

During the last 10 years, I've been working with the Esper CEP engine. In the last versions of the engine, I find the support of dataflows very powerful. However, according to the Esper documentation (http://esper.espertech.com/release-8.3.0/reference-esper/html/dataflow.html):

Only filter-based streams are allowed in the from clause and patterns or named windows are not supported. Also not allowed are the insert into clause, the irstream keyword and subselects.

I'd need to use pattern operators such as every, every-distinct, repeat, until, followed-by, etc. to define these types of conditions inside a dataflow. Please could you recommend how I could make use of these types of operators with dataflows?

As an example, I'd like to include the following pattern into the dataflow:

@Name('NO2_Avg')
@Priority(3)
insert into NO2_Avg
select a1.stationId.toString() || current_timestamp().toString() as id,
  current_timestamp() as timestamp, a1.stationId as stationId,
  avg(a1.no2) as value
from pattern [every (a1 = AirMeasurement -> a2 = AirMeasurement(a2.no2 > a1.no2))].win:time(3600 milliseconds)
group by a1.stationId

String epl = "create dataflow PollutantDataFlow\n" +
"create schema AirQualityType(timestamp long, stationId integer, no2 double),\n" +
"FileSource -> airqualitystream<AirQualityType>{\n" +
" file: 'sensor_events.csv', \n" +
" propertyNames: ['timestamp','stationId','no2'] \n" +
"}\n" +
"Select(airqualitystream) -> no2_avg{\n" +
" select: (select a1.stationId.toString() || current_timestamp().toString() as id,\r\n" +
"   current_timestamp() as timestamp, a1.stationId as stationId, \r\n" +
"   avg(a1.no2) as value \r\n" +
" from [every (a1 = AirMeasurement -> a2 = AirMeasurement(a2.no2 > a1.no2))].win:time(3600000 milliseconds) group by a1.stationId) }\r\n" +
" \r\n" +
" LogSink(no2_good) {\r\n" +
" layout : '%t [%e]',\r\n" +
"    log : false,\r\n" +
"    linefeed : true,\r\n" +
"    title : 'Input:'}\r\n";

Thank you very much for your time and help.

Best regards,

Juan

juanboubeta avatar Feb 21 '20 23:02 juanboubeta

Match-recognize would seem to solve this as well

bernhardttom avatar Feb 24 '20 14:02 bernhardttom

I would like to suggest this temporal solution:

According to Esper Docs EventBusSink:

The EventBusSink operator send events received from a data flow into the event bus.

So, would it be possible to create a dataflow that collects events using any kind of input operator (AMQP, File, etc.), publish them into the Event Bus and, then, deploy these required patterns against this new input stream?

Instead of using AirMeasurement in the from pattern clause, we will be using, for example, AirMeasurementStream.

I think that this solution results into duplication of information, right? Because we are inserting two times the same event: first when it is collected within the dataflow input operator and then when it is published into the Event Bus by the dataflow sink operator.

What are your thoughts about this @bernhardttom?

DavidCorral94 avatar Mar 02 '20 09:03 DavidCorral94