three streams join not work
select cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eValue)[-1])+(collect(cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eValue)[-1])+(collect(cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eValue)[-1])) as calValue from cz110201Double_MB1_cz110201Double_MB10016 left join cz110201Double_MB1_cz110201Double_MB10015 left join cz110201Double_MB1_cz110201Double_MB10014 on cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode = cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAssetCode and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode = cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAssetCode where cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeName = "cz110201Double_MB10016" and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAttributeCode = "cz110201Double_MB10016" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeName = "cz110201Double_MB10015" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10015.Data[0]-\u003eAttributeCode = "cz110201Double_MB10015" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeName = "cz110201Double_MB10014" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeGroupName = "C" and cz110201Double_MB1_cz110201Double_MB10014.Data[0]-\u003eAttributeCode = "cz110201Double_MB10014" group by cz110201Double_MB1_cz110201Double_MB10016.Data[0]-\u003eAssetCode,TUMBLINGWINDOW(SS, 20);
the status of rule: { "source_cz110201Double_MB1_cz110201Double_MB10016_0_records_in_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10016_0_records_out_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10016_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10016_0_last_invocation": "2021-12-02T05:54:22.624369", "source_cz110201Double_MB1_cz110201Double_MB10015_0_records_in_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10015_0_records_out_total": 280649, "source_cz110201Double_MB1_cz110201Double_MB10015_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10015_0_last_invocation": "2021-12-02T05:54:22.623828", "source_cz110201Double_MB1_cz110201Double_MB10014_0_records_in_total": 280650, "source_cz110201Double_MB1_cz110201Double_MB10014_0_records_out_total": 280650, "source_cz110201Double_MB1_cz110201Double_MB10014_0_exceptions_total": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_process_latency_us": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_buffer_length": 0, "source_cz110201Double_MB1_cz110201Double_MB10014_0_last_invocation": "2021-12-02T05:54:22.641001", "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_records_in_total": 280649, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_records_out_total": 280649, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_exceptions_total": 0, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_process_latency_us": 1, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_buffer_length": 0, "op_1_preprocessor_cz110201Double_MB1_cz110201Double_MB10016_0_last_invocation": "2021-12-02T05:54:22.624374", "op_2_filter_0_records_in_total": 280649, "op_2_filter_0_records_out_total": 280649, "op_2_filter_0_exceptions_total": 0, "op_2_filter_0_process_latency_us": 9, "op_2_filter_0_buffer_length": 0, "op_2_filter_0_last_invocation": "2021-12-02T05:54:22.624378", "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_records_in_total": 280649, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_records_out_total": 280649, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_exceptions_total": 0, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_process_latency_us": 2, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_buffer_length": 0, "op_3_preprocessor_cz110201Double_MB1_cz110201Double_MB10015_0_last_invocation": "2021-12-02T05:54:22.623832", "op_4_filter_0_records_in_total": 280649, "op_4_filter_0_records_out_total": 280649, "op_4_filter_0_exceptions_total": 0, "op_4_filter_0_process_latency_us": 12, "op_4_filter_0_buffer_length": 0, "op_4_filter_0_last_invocation": "2021-12-02T05:54:22.623838", "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_records_in_total": 280650, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_records_out_total": 280650, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_exceptions_total": 0, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_process_latency_us": 3, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_buffer_length": 0, "op_5_preprocessor_cz110201Double_MB1_cz110201Double_MB10014_0_last_invocation": "2021-12-02T05:54:22.641013", "op_6_filter_0_records_in_total": 280650, "op_6_filter_0_records_out_total": 280650, "op_6_filter_0_exceptions_total": 0, "op_6_filter_0_process_latency_us": 9, "op_6_filter_0_buffer_length": 0, "op_6_filter_0_last_invocation": "2021-12-02T05:54:22.64102", "op_7_window_0_records_in_total": 841948, "op_7_window_0_records_out_total": 70, "op_7_window_0_exceptions_total": 0, "op_7_window_0_process_latency_us": 0, "op_7_window_0_buffer_length": 0, "op_7_window_0_last_invocation": "2021-12-02T05:54:22.641037", "op_8_join_0_records_in_total": 1, "op_8_join_0_records_out_total": 0, "op_8_join_0_exceptions_total": 0, "op_8_join_0_process_latency_us": 0, "op_8_join_0_buffer_length": 0, "op_8_join_0_last_invocation": "2021-12-02T05:31:19.004878", "op_9_aggregate_0_records_in_total": 0, "op_9_aggregate_0_records_out_total": 0, "op_9_aggregate_0_exceptions_total": 0, "op_9_aggregate_0_process_latency_us": 0, "op_9_aggregate_0_buffer_length": 0, "op_9_aggregate_0_last_invocation": 0, "op_10_project_0_records_in_total": 0, "op_10_project_0_records_out_total": 0, "op_10_project_0_exceptions_total": 0, "op_10_project_0_process_latency_us": 0, "op_10_project_0_buffer_length": 0, "op_10_project_0_last_invocation": 0, "sink_mqtt_0_0_records_in_total": 0, "sink_mqtt_0_0_records_out_total": 0, "sink_mqtt_0_0_exceptions_total": 0, "sink_mqtt_0_0_process_latency_us": 0, "sink_mqtt_0_0_buffer_length": 0, "sink_mqtt_0_0_last_invocation": 0, "sink_rest_0_0_records_in_total": 0, "sink_rest_0_0_records_out_total": 0, "sink_rest_0_0_exceptions_total": 0, "sink_rest_0_0_process_latency_us": 0, "sink_rest_0_0_buffer_length": 0, "sink_rest_0_0_last_invocation": 0 }
If two streams are joined, there is a result output
I don't know where is the problem?
What version do you use? Any findings in the log?
version:1.3.1
from cmd of bin/kuiper getstatus rule cz110201Double_MB1_tuple_3_attr
Three streams of data have been increasing
Please modify the join to have on as early as possible such as:
cz110201Double_MB1_cz110201Double_MB10016 left join cz110201Double_MB1_cz110201Double_MB10015 on cz110201Double_MB1_cz110201Double_MB10016.Data[0]->AssetCode = cz110201Double_MB1_cz110201Double_MB10015.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Double_MB10014 ON cz110201Double_MB1_cz110201Double_MB10016.Data[0]->AssetCode = cz110201Double_MB1_cz110201Double_MB10014.Data[0]->AssetCode
select cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Int_MB10012.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10011.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10010.Data[0]->Value)[-1])) as calValue from cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10011.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Int_MB10010 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10010.Data[0]->AssetCode group by cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode,TUMBLINGWINDOW(SS, 20);
Still no results returned
No exception can be seen through the log
cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 There is a result returned cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10010 There is a result returned
but cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 left join cz110201Double_MB1_cz110201Int_MB10010
No results have been returned
My SQL syntax is incorrect?
The data is too much to return if your on condition does not filter much data or even no on condition, the result data will be n * n * n and will be overwhelming if n is big.
select cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode as assetCode, window_end() * 1000000 as ts, ((collect(cz110201Double_MB1_cz110201Int_MB10012.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10011.Data[0]->Value)[-1])+(collect(cz110201Double_MB1_cz110201Int_MB10010.Data[0]->Value)[-1])) as calValue from cz110201Double_MB1_cz110201Int_MB10012 left join cz110201Double_MB1_cz110201Int_MB10011 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10011.Data[0]->AssetCode left join cz110201Double_MB1_cz110201Int_MB10010 on cz110201Double_MB1_cz110201Int_MB10012.Data[0]->AssetCode = cz110201Double_MB1_cz110201Int_MB10010.Data[0]->AssetCode group by assetCode,TUMBLINGWINDOW(SS, 5);
Now The window size is reduced, and data is output. But calValue column return nothing

Could you please provide some sample data of your stream? Thanks.
cz110201Double_MB1_cz110201Int_MB10012: {"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296015226739}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0129","AttributeId":"DK_110201_0129\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0129","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0192","AttributeId":"DK_110201_0192\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0192","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0003","AttributeId":"DK_110201_0003\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0003","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296038547873}]} {"Data":[{"AssetId":"DK_110201_0158","AttributeId":"DK_110201_0158\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0158","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0032","AttributeId":"DK_110201_0032\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0032","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0095","AttributeId":"DK_110201_0095\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0095","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":0,"Quality":0,"Timestamp":1638513296067651394}]} {"Data":[{"AssetId":"DK_110201_0195","AttributeId":"DK_110201_0195\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0195","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0069","AttributeId":"DK_110201_0069\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0069","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0132","AttributeId":"DK_110201_0132\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0132","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0006","AttributeId":"DK_110201_0006\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0006","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296093107464}]} {"Data":[{"AssetId":"DK_110201_0150","AttributeId":"DK_110201_0150\u0026cz110201Int_MB10012","AssetCode":"DK_110201_0150","AttributeCode":"cz110201Int_MB10012","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10012","Value":1,"Quality":0,"Timestamp":1638513296100850890}]}
cz110201Double_MB1_cz110201Int_MB10011: {"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10011","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10011","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10011","Value":0,"Quality":0,"Timestamp":1638513354029289175}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10011","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10011","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10011","Value":1,"Quality":0,"Timestamp":1638513354053404068}]}
cz110201Double_MB1_cz110201Int_MB10010:
{"Data":[{"AssetId":"DK_110201_0126","AttributeId":"DK_110201_0126\u0026cz110201Int_MB10010","AssetCode":"DK_110201_0126","AttributeCode":"cz110201Int_MB10010","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10010","Value":0,"Quality":0,"Timestamp":1638513410043819554}]} {"Data":[{"AssetId":"DK_110201_0066","AttributeId":"DK_110201_0066\u0026cz110201Int_MB10010","AssetCode":"DK_110201_0066","AttributeCode":"cz110201Int_MB10010","AttributeGroupName":"B","AttributeName":"cz110201Int_MB10010","Value":1,"Quality":0,"Timestamp":1638513410067138903}]}
I am sure that the three streams have the same device data coming in in the window
Take the latest value of the three streams in the TUMBLINGWINDOW
Thanks, we are investigating
Is there a conclusion? Is there a problem with my usage?
@nickscut With the data you provided, I can get two calValue for assetCode 0066 and 0126. Because all 3 streams have these assetCode. For operator +, if any of the operand is nil, the result will be nil.
[map[assetCode:DK_110201_0006] map[assetCode:DK_110201_0150] map[assetCode:DK_110201_0066 calValue:%!s(float64
=3)] map[assetCode:DK_110201_0003] map[assetCode:DK_110201_0032] map[assetCode:DK_110201_0095] map[assetCode:DK_110201_0195] map[assetCode:DK_110201_0132] map[assetCode:DK_110201_01
26 calValue:%!s(float64=0)] map[assetCode:DK_110201_0129] map[assetCode:DK_110201_0192] map[assetCode:DK_110201_0158] map[assetCode:DK_110201_0069]]
We may need a COALESCE function like pgsql to return the first non null value. So COALESCE(col, 0) will return 0 if column col is null.
Close this and create another issue for the coalesce function #1462