flink-siddhi
flink-siddhi copied to clipboard
支持动态更新CEP规则
请问flink-siddhi支持动态更新CEP规则吗?一般的套路是什么?有没有示例?谢谢!
DataStream<ControlEvent> controlStream = env.addSource(new SourceFunction<ControlEvent>() {
@Override
public void run(SourceContext<ControlEvent> sourceContext) throws Exception {
String id1 = MetadataControlEvent.Builder.nextExecutionPlanId();
ControlEvent event = MetadataControlEvent.builder()
.addExecutionPlan(id1,
"from input insert into out; ")
.build();
sourceContext.collect(event);
String id2 = MetadataControlEvent.Builder.nextExecutionPlanId();
sourceContext.collect(MetadataControlEvent.builder()
.addExecutionPlan(id2,
"from input select '10006' as id insert into output; ")
.build());
sourceContext.collect(OperationControlEvent.enableQuery(id1));
sourceContext.collect(OperationControlEvent.enableQuery(id2));
Thread.sleep(10000);
System.out.println("删除第一个sql");
// sourceContext.collect(OperationControlEvent.disableQuery(id1));
sourceContext.collect(MetadataControlEvent.builder().removeExecutionPlan(id1).build());
// sourceContext.collect(OperationControlEvent.enableQuery(id2));
}
@Override
public void cancel() {
System.out.println("cancel ------------------------------------------------------------------");
}
});
有比较完整的示例吗?谢谢!
public class Test1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
for (int i = 0; i < 11111111; i++) {
sourceContext.collect(i);
Thread.sleep(5000);
}
}
@Override
public void cancel() {
}
});
DataStream<ControlEvent> controlStream = env.addSource(new SourceFunction<ControlEvent>() {
@Override
public void run(SourceContext<ControlEvent> sourceContext) throws InterruptedException {
sourceContext.collect(MetadataControlEvent.builder()
.addExecutionPlan("1", "from flow_http_all select 1 as id,count insert into output;")
.build());
sourceContext.collect(OperationControlEvent.enableQuery("1"));
sourceContext.collect(MetadataControlEvent.builder()
.addExecutionPlan("2", "from flow_http_all[count<2000] select 2 as id,count insert into output;")
.build());
sourceContext.collect(OperationControlEvent.enableQuery("2"));
Thread.sleep(10000);
System.out.println("移走规则2");
sourceContext.collect(OperationControlEvent.disableQuery("2"));
sourceContext.collect(MetadataControlEvent.builder().removeExecutionPlan("2").build());
}
@Override
public void cancel() {
}
});
// 事件流处理
DataStream<Row> result = SiddhiCEP.define("flow_http_all", stream, "count")
.cql(controlStream).returnAsRow("output");
// 处理结果转换
result.print();
env.execute();
}
}
结果: