tremor-runtime icon indicating copy to clipboard operation
tremor-runtime copied to clipboard

Handle CB events for event duplication scenarios

Open mfelsche opened this issue 4 years ago • 1 comments
trafficstars

Describe the problem you are trying to solve

We have multiple cases where events are duplicated:

  • 1 operator/port connected via select statement to two different operators/scripts/ports. The events get copied when they leave the originating operator/port.
  • a group by each(...) which duplicates the given event based on an array of group values.
  • operator/script emits > 1 event for an incoming event (e.g. when we have a send primitive (See #531).

We are going to receive multiple CB events for a single event-id. These events might contradict each other, one might be an ack, one might be a fail. It is unclear what to do in this situation.

Describe the solution you'd like

We need to aggregate the CB events for a duplicated event at the point where it got duplicated. Locations are:

  • within nodes, when they emit > 1 event for a single input event
  • at the boundary between nodes, when an upstream node is connected to > 1 downstream nodes
  • connection from a pipeline to multiple offramps/pipelines

In those cases we need to make sure that multiple CB events result in exactly one CB event that flows further upstream. The logic is: If any of the CB events has been a fail, the upstream CB event will be a fail, otherwise it is an ack (It might be worth making this logic configurable, maybe not).

To account for this we register event ids for which we have > 1 downstream event with the count of expected CB events. If an ack for this eventid comes in via on_contraflow we check the registry and decrease the counter. When the counter is 0, we issue an ack upstream, we could also forward the last ack that decreased the counter to 0. When a fail arrives, we remove the event id from the registry and forward the fail. We might want to add a tombstone to mark that we gonna ignore any further Cb events for this id.

We need to take great care that we remove entries here as quickly as possible to not overcrowd memory and slow down general processing.

We should also measure the performance impact of this change, and might need to revisit this problem for another solution.

mfelsche avatar Jan 13 '21 17:01 mfelsche

When we have Troy, we can control this via the connect statement:

connect "connector/my_tcp/instance" to "/pipeline/foo/instance", "/pipeline/foo/instance" with gd=at_least_one;

where possible values for gd are always, require_all and just_one (maybe others make sense here to). Those names need to be discussed and improved.

The same could be applied to in-pipeline select statements:

create stream snot;
create stream badger;

# with this config event will be acked upstream if the snot branch succeeds, independent of 
# the badger branch
select event from in into snot with gd=require_all;
select event from in into badger with gd=none;

# this would control that either snot or badger needs to succeed to succeed event
select event from in into snot with gd=just_one;
select event from in into badger with gd=just_one;

We might need a timeout, especially when configuring that all branches should succeed for a gd contraflow being sent. Otherwise we might wait infinitely, if an event is swallowed.

This requires #1162 so events are not swallowed when reaching a dead end via drop or a not-connected port.

mfelsche avatar Sep 30 '21 09:09 mfelsche