kapacitor icon indicating copy to clipboard operation
kapacitor copied to clipboard

Join two nodes when one has no data

Open ASKozienko opened this issue 6 years ago • 8 comments

For example I have 2 metrics (temp1, temp2). But only one has incoming data. Maybe temp2 sensor is turned off, does not matter. For my condition "t1.value" > 0 OR "t2.value" > 0 temp2 is not relevant. But I didn't see in the log any point with prefix P1 after join, join does not happen. So the question is how to make full outer join in my case?

PS. if temp2 has incoming data it works fine

var temp1 = batch
    |query('SELECT mean(value) AS value FROM "my_db"."autogen".temp1')
        .period(10s)
        .every(5s)
        .groupBy(time(2s), *)
        .fill(0)
    |log()
        .prefix('P0-1')
        .level('DEBUG')

var temp2 = batch
    |query('SELECT mean(value) AS value FROM "my_db"."autogen".temp2')
        .period(10s)
        .every(5s)
        .groupBy(time(2s), *)
        .fill(0)
    |log()
        .prefix('P0-2')
        .level('DEBUG')

temp1
    |join(temp2)
        .tolerance(5s)
        .fill('null')
        .as('t1', 't2')
    |default()
        .field('t1.value', -1)
        .field('t2.value', -1)
    |log()
        .prefix('P1')
        .level('DEBUG')
     |where(lambda: "t1.value" > 0 OR "t2.value" > 0)
     |alert()

ASKozienko avatar Mar 21 '18 08:03 ASKozienko

I'm seeing the same behavior that fill('null') or fill(value) does not work on the JoinNode. I'm using kapacitor 1.4.1.

conet avatar May 26 '18 03:05 conet

Looking at the join code I have a hunch that I'm hitting this condition.

For example one has measurement A and B and measurement A has series A,x A,y and A,z and B has series B,x and B,y outer join will only join A,B,x and A,B,y because of that condition. If B would have B,z but some points were missing then the outer join would work as defined.

Could this be fixed or at least the documentation be updated to avoid confusion?

conet avatar May 26 '18 04:05 conet

Hey guys, not clear if this issue has been sorted. I have the scenario when a batch query return no data. how I can manage this scenario? join or union doesn't work

sdicataldo avatar May 08 '19 12:05 sdicataldo

Kapacitor: 1.5.3 InfluxDB: 1.7.8

Hi guys, I have a TICKscript like this:

var req_all = stream |from() .measurement('request_log') |window() .period(1m) .every(1m) |count('step') .as('cnt')

var req_err = stream |from() .measurement('request_log') .where(lambda: "status" =~ /^5\d\d$/) |window() .period(1m) .every(1m) |count('step') .as('cnt')

req_err |join(req_all) .as('err', 'all') .fill(0) |eval(lambda: "all.cnt", lambda: (float("all.cnt") - float("err.cnt")) / float("all.cnt") * 100.0) .as('qpm', 'succ_rate') |influxDBOut() .database('data') .measurement('request_stats')

It doesn't work properly. The symptom is like that mentioned above. I cannot see the result data in my InfluxDB when the task is running. However, the result data will be written into InfluxDB when I disable the task.

from4 [avg_exec_time_ns="18.189µs" errors="0" working_cardinality="0" ]; from4 -> window5 [processed="0"];

window5 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; window5 -> count6 [processed="0"];

count6 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; count6 -> join8 [processed="0"];

from1 [avg_exec_time_ns="22.796µs" errors="0" working_cardinality="0" ]; from1 -> window2 [processed="713133"];

window2 [avg_exec_time_ns="28.023µs" errors="0" working_cardinality="1" ]; window2 -> count3 [processed="2825"];

count3 [avg_exec_time_ns="6.653µs" errors="0" working_cardinality="1" ]; count3 -> join8 [processed="2825"];

join8 [avg_exec_time_ns="140.31µs" errors="0" working_cardinality="1" ]; join8 -> eval9 [processed="0"];

eval9 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; eval9 -> influxdb_out10 [processed="0"];

influxdb_out10 [avg_exec_time_ns="0s" errors="0" points_written="0" working_cardinality="0" write_errors="0" ];

oraclecaicai avatar Sep 16 '19 03:09 oraclecaicai

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

dima100 avatar Sep 19 '19 20:09 dima100

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

Thank you, man. Now I'm using a method similar to yours. It seems that each stream in a join must has data coming in in every window, or else the join will not output any data. In my scenario, I use the tail plugin of Fluent Bit to fetch some data from our application logs. To make at least one error occur in every window, I use another plugin named exec to echo a dummy entry every minute and add a type field to distinguish them from the genuine entries. Then in Kapacitor, I divide the original stream into three, all entries with dummy ones (for count only), error entries with dummy ones (for count only) and all entries without dummy ones (for count and other aggregations), by WhereNode. Each stream has at least one point in every minute. After some other nodes like GroupByNode etc, multiple streams generated by the the three streams will be joined. Then I use an EvalNode and in one of its lambda expression I substract the ERROR count from the ALL count. Because the two counts both contain the number of dummy entries, the result will filter them out, then it will be divided by the count of the third stream. You may have guessed that I want to work out the success rate. PS: I don't use the where property method of the FromNode, because it comes before the WindowNode. There seems to be a deviation in the window range if streams each with an individual WindowNode are joined together even if the parameters of their every and period methods are totally the same and the align methods are also used.

oraclecaicai avatar Oct 04 '19 10:10 oraclecaicai

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

Could you share a sample on how you got this working?

I tried this approach, but not seeing the results expected for outer join -- it's still giving the results of an inner join.

var s1 = batch | query(..).period(1h).every(10s).groupBy('f1', 'f2')
var s2 = batch | query(..).period(1h).every(10s).groupBy('f1', 'f2')

var u1 = s1 | union(s2)

s1 | join(u1) | log()

Another problem I observed was with the results of union itself --

Sometimes it's getting split across different run of the batch:

15:10:12 -> s1r1, s1r2
15:10:22 -> s2r1, s1r1, s1r2, s2r1

where s1r1 & s1r2 are results of s1 and s2r1 is that of s2

prmkbr avatar May 14 '20 16:05 prmkbr

This issue has been open for quite some time and many people (including me) ran into this problem. I think at least the documentation for JoinNode should include a big warning box that the join will not happen when one of the streams has no data points incoming.

rluetzner avatar Oct 22 '21 06:10 rluetzner