ekuiper icon indicating copy to clipboard operation
ekuiper copied to clipboard

three streams join not work

Open nickscut opened this issue 4 years ago • 18 comments

select cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eValue)[-1])+(collect(cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eValue)[-1])+(collect(cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eValue)[-1])) as calValue from cz110201Double_MB1_cz110201Double_MB10016 left join cz110201Double_MB1_cz110201Double_MB10015 left join cz110201Double_MB1_cz110201Double_MB10014 on cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode = cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAssetCode and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode = cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAssetCode where cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeName = "cz110201Double_MB10016" and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeCode = "cz110201Double_MB10016" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeName = "cz110201Double_MB10015" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeCode = "cz110201Double_MB10015" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeName = "cz110201Double_MB10014" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeCode = "cz110201Double_MB10014" group by cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode,TUMBLINGWINDOW(SS, 20);

the status of rule: { "source_cz110201Double_MB1_cz110201Double_MB10016_0_records_in_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10016_0_records_out_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10016_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_last_invocation": "2021-12-02T05:54:22.624369", "source_cz110201Double_MB1_cz110201Double_MB10015_0_records_in_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10015_0_records_out_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10015_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_last_invocation": "2021-12-02T05:54:22.623828", "source_cz110201Double_MB1_cz110201Double_MB10014_0_records_in_total": 280650, "source_cz110201Double_MB1_cz110201Double_MB10014_0_records_out_total": 280650, "source_cz110201Double_MB1_cz110201Double_MB10014_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_last_invocation": "2021-12-02T05:54:22.641001", "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_records_in_total": 280649, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_records_out_total": 280649, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_exceptions_total": 0, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_process_latency_us": 1, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_buffer_length": 0, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_last_invocation": "2021-12-02T05:54:22.624374", "op_2_filter_0_records_in_total": 280649, "op_2_filter_0_records_out_total": 280649, "op_2_filter_0_exceptions_total": 0, "op_2_filter_0_process_latency_us": 9, "op_2_filter_0_buffer_length": 0, "op_2_filter_0_last_invocation": "2021-12-02T05:54:22.624378", "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_records_in_total": 280649, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_records_out_total": 280649, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_exceptions_total": 0, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_process_latency_us": 2, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_buffer_length": 0, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_last_invocation": "2021-12-02T05:54:22.623832", "op_4_filter_0_records_in_total": 280649, "op_4_filter_0_records_out_total": 280649, "op_4_filter_0_exceptions_total": 0, "op_4_filter_0_process_latency_us": 12, "op_4_filter_0_buffer_length": 0, "op_4_filter_0_last_invocation": "2021-12-02T05:54:22.623838", "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_records_in_total": 280650, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_records_out_total": 280650, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_exceptions_total": 0, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_process_latency_us": 3, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_buffer_length": 0, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_last_invocation": "2021-12-02T05:54:22.641013", "op_6_filter_0_records_in_total": 280650, "op_6_filter_0_records_out_total": 280650, "op_6_filter_0_exceptions_total": 0, "op_6_filter_0_process_latency_us": 9, "op_6_filter_0_buffer_length": 0, "op_6_filter_0_last_invocation": "2021-12-02T05:54:22.64102", "op_7_window_0_records_in_total": 841948, "op_7_window_0_records_out_total": 70, "op_7_window_0_exceptions_total": 0, "op_7_window_0_process_latency_us": 0, "op_7_window_0_buffer_length": 0, "op_7_window_0_last_invocation": "2021-12-02T05:54:22.641037", "op_8_join_0_records_in_total": 1, "op_8_join_0_records_out_total": 0, "op_8_join_0_exceptions_total": 0, "op_8_join_0_process_latency_us": 0, "op_8_join_0_buffer_length": 0, "op_8_join_0_last_invocation": "2021-12-02T05:31:19.004878", "op_9_aggregate_0_records_in_total": 0, "op_9_aggregate_0_records_out_total": 0, "op_9_aggregate_0_exceptions_total": 0, "op_9_aggregate_0_process_latency_us": 0, "op_9_aggregate_0_buffer_length": 0, "op_9_aggregate_0_last_invocation": 0, "op_10_project_0_records_in_total": 0, "op_10_project_0_records_out_total": 0, "op_10_project_0_exceptions_total": 0, "op_10_project_0_process_latency_us": 0, "op_10_project_0_buffer_length": 0, "op_10_project_0_last_invocation": 0, "sink_mqtt_0_0_records_in_total": 0, "sink_mqtt_0_0_records_out_total": 0, "sink_mqtt_0_0_exceptions_total": 0, "sink_mqtt_0_0_process_latency_us": 0, "sink_mqtt_0_0_buffer_length": 0, "sink_mqtt_0_0_last_invocation": 0, "sink_rest_0_0_records_in_total": 0, "sink_rest_0_0_records_out_total": 0, "sink_rest_0_0_exceptions_total": 0, "sink_rest_0_0_process_latency_us": 0, "sink_rest_0_0_buffer_length": 0, "sink_rest_0_0_last_invocation": 0 }

If two streams are joined, there is a result output

I don't know where is the problem?

nickscut avatar Dec 02 '21 06:12 nickscut

What version do you use? Any findings in the log?

ngjaying avatar Dec 02 '21 06:12 ngjaying

version:1.3.1

nickscut avatar Dec 02 '21 06:12 nickscut

from cmd of bin/kuiper getstatus rule cz110201Double_MB1_tuple_3_attr

Three streams of data have been increasing

nickscut avatar Dec 02 '21 06:12 nickscut

Please modify the join to have on as early as possible such as:

cz110201Double_MB1_cz110201Double_MB10016 left join cz110201Double_MB1_cz110201Double_MB10015 on cz110201Double_MB1_cz110201Double_MB10016.Data[0]->AssetCode = cz110201Double_MB1_cz110201Double_MB10015.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Double_MB10014 ON cz110201Double_MB1_cz110201Double_MB10016.Data[0]->AssetCode = cz110201Double_MB1_cz110201Double_MB10014.Data[0]->AssetCode

ngjaying avatar Dec 02 '21 07:12 ngjaying

select cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Int_MB10012.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10011.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10010.Data[0]->Value)[-1])) as calValue from cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10011.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Int_MB10010 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10010.Data[0]->AssetCode group by cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode,TUMBLINGWINDOW(SS, 20);

Still no results returned

nickscut avatar Dec 02 '21 09:12 nickscut

image No exception can be seen through the log

nickscut avatar Dec 02 '21 09:12 nickscut

cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 There is a result returned cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10010 There is a result returned

but cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 left join cz110201Double_MB1_cz110201Int_MB10010

No results have been returned

nickscut avatar Dec 02 '21 09:12 nickscut

My SQL syntax is incorrect?

nickscut avatar Dec 02 '21 09:12 nickscut

The data is too much to return if your on condition does not filter much data or even no on condition, the result data will be n * n * n and will be overwhelming if n is big.

ngjaying avatar Dec 02 '21 09:12 ngjaying

select cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Int_MB10012.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10011.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10010.Data[0]->Value)[-1])) as calValue from cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10011.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Int_MB10010 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10010.Data[0]->AssetCode group by assetCode,TUMBLINGWINDOW(SS, 5);

Now The window size is reduced, and data is output. But calValue column return nothing

image

nickscut avatar Dec 03 '21 03:12 nickscut

Could you please provide some sample data of your stream? Thanks.

ngjaying avatar Dec 03 '21 06:12 ngjaying

cz110201Double_MB1_cz110201Int_MB10012: {"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296015226739}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0129","AttributeId":"DK_110201_0129\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0129","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0192","AttributeId":"DK_110201_0192\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0192","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0003","AttributeId":"DK_110201_0003\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0003","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0158","AttributeId":"DK_110201_0158\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0158","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0032","AttributeId":"DK_110201_0032\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0032","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0095","AttributeId":"DK_110201_0095\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0095","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0195","AttributeId":"DK_110201_0195\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0195","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0069","AttributeId":"DK_110201_0069\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0069","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0132","AttributeId":"DK_110201_0132\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0132","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0006","AttributeId":"DK_110201_0006\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0006","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0150","AttributeId":"DK_110201_0150\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0150","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296100850890}]}

cz110201Double_MB1_cz110201Int_MB10011: {"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10011","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10011","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10011","Value":0,"Quality":0,"Timestamp":1638513354029289175}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10011","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10011","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10011","Value":1,"Quality":0,"Timestamp":1638513354053404068}]}

cz110201Double_MB1_cz110201Int_MB10010:

{"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10010","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10010","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10010","Value":0,"Quality":0,"Timestamp":1638513410043819554}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10010","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10010","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10010","Value":1,"Quality":0,"Timestamp":1638513410067138903}]}

nickscut avatar Dec 03 '21 06:12 nickscut

I am sure that the three streams have the same device data coming in in the window

nickscut avatar Dec 03 '21 06:12 nickscut

Take the latest value of the three streams in the TUMBLINGWINDOW

nickscut avatar Dec 03 '21 06:12 nickscut

Thanks, we are investigating

ngjaying avatar Dec 03 '21 09:12 ngjaying

Is there a conclusion? Is there a problem with my usage?

nickscut avatar Dec 06 '21 03:12 nickscut

@nickscut With the data you provided, I can get two calValue for assetCode 0066 and 0126. Because all 3 streams have these assetCode. For operator +, if any of the operand is nil, the result will be nil.

[map[assetCode:DK_110201_0006] map[assetCode:DK_110201_0150] map[assetCode:DK_110201_0066 calValue:%!s(float64
=3)] map[assetCode:DK_110201_0003] map[assetCode:DK_110201_0032] map[assetCode:DK_110201_0095] map[assetCode:DK_110201_0195] map[assetCode:DK_110201_0132] map[assetCode:DK_110201_01
26 calValue:%!s(float64=0)] map[assetCode:DK_110201_0129] map[assetCode:DK_110201_0192] map[assetCode:DK_110201_0158] map[assetCode:DK_110201_0069]]

ngjaying avatar Dec 06 '21 07:12 ngjaying

We may need a COALESCE function like pgsql to return the first non null value. So COALESCE(col, 0) will return 0 if column col is null.

ngjaying avatar Dec 10 '21 02:12 ngjaying

Close this and create another issue for the coalesce function #1462

ngjaying avatar Oct 27 '22 11:10 ngjaying