PF_RING icon indicating copy to clipboard operation
PF_RING copied to clipboard

[FT] explicitly expiring flows based on custom app logic and custom flow state

Open tadams42 opened this issue 2 years ago • 0 comments

Hi!

I would like to slice some (but not all) flows passing through my app. To do this, I would like to propose adding:

pfring_ft_expire_flow(pfring_ft_table *ft, pfring_ft_flow *flow);
  • new function would've been called from "packet arrived callback" that was set up on instance of FT by calling pfring_ft_set_flow_packet_callback
  • app would implement necessary logic to decide if and when to call pfring_ft_expire_flow based on data in pfring_ft_flow * and data in pfring_ft_flow_value.user
  • new function would cause FT to expire flow, triggering callback that was prepared via pfring_ft_set_flow_export_callback
  • further packets arriving into this (now expired) flow, would instead cause FT to start a new one
  • "global" FT parameters like flow_life_timeout and flow_idle_timeuout would still work as usual and be completely under control of FT instance

In my data pipeline, I'm now handling large flows by either filtering them out or slicing them after they were captured - in separate application. Slicing them directly on source of capture would be prefered.

As an example scenario, let's assume I'd like slicing of YouTube flows on every 1000 packets (just for illustration purposes, since scenarios I require are more complex than this and don't necessarily rely on L7 data). Implementation of such hypothetical scenario would look like like this:

  1. create FT, attach custom struct to each flow and setup callbacks:

    typedef struct {
        int my_slice_count;
        // ...
    } UserData;
    
    // ...
    pfring_ft_table *ft = pfring_ft_create_table(ft_flags, 0, 0, 0, sizeof(UserData));
    pfring_ft_set_flow_packet_callback(ft, packet_added_cb, ft);
    // ...
    
  2. in packet_added_cb, check nDPI for YouTube and count packets in UserData.my_slice_count

    void packet_added_cb(
        const uint8_t *data, pfring_ft_packet_metadata *metadata, pfring_ft_flow *flow, void *user
    ) {
        pfring_ft_table *ft = (pfring_ft_table *)(user);
        pfring_ft_flow_value *v = pfring_ft_flow_get_value(flow);
        UserData *user_data = (UserData *)(v->user);
    
        struct ndpi_detection_module_struct *ndpi = pfring_ft_get_ndpi_handle(ft);
        if (is_flow_detected_as_youtube(ndpi, flow)) {
            user_data->my_slice_count += 1;
        }
    }
    
  3. Later, also in packet_added_cb, app checks custom state (counter) and notifies FT to expire given flow:

    void packet_added_cb(
        const uint8_t *data, pfring_ft_packet_metadata *metadata, pfring_ft_flow *flow, void *user
    ) {
        // ...       
        if (is_flow_detected_as_youtube(ndpi)) {
            user_data->my_slice_count += 1;
    
            if (user_data->my_slice_count >= 1000) {
                pfring_ft_expire_flow(ft, flow);
            }
        }
    }
    

Thanks in advance if you consider implementing this!

tadams42 avatar Apr 12 '23 11:04 tadams42