pktvisor icon indicating copy to clipboard operation
pktvisor copied to clipboard

pktvisor-reader should process multiple pcap files and output multiple metric buckets

Open weyrick opened this issue 3 years ago • 1 comments

Current pktvisor-reader works by taking a single input file and producing a single output bucket.

There is a use case to process multiple pcap files at once, for example, a series of smaller pcap files captured in series. In this case the user would like to produce JSON summaries as if the data had been live.

To enable this use case, pktvisor-reader needs:

  1. The ability to have multiple files specified on the command line OR the ability to ingest a raw series of bytes via file description
  2. The ability to produce output metrics (JSON format) per policy per bucket as the pcap processing progresses. This requires a strategy for saving multiple buckets to a single output file, or multiple output files to a directory.

weyrick avatar Apr 07 '22 19:04 weyrick

Here follows a refinement of step 2, including details on hooking into the metric buckets per period and allowing for saving to sqlite database instead of JSON text files, which are hard to process.

The proper place to hook into the buckets during processing is during the AbstractMetricsManager::on_period_shift() event. This is called for every time window shift, immediately after the latest live bucket shifts to the first non-live bucket.

Since every handler in every policy (because it extends StreamMetricsHandler) will have separate on_period_shift events, we need a way to hook into all of these at the Policy level.

Therefore, we should be able to set a way to sink all buckets for all handlers in a Policy by including policy configuration. For example:

visor:
  taps:
    anycast:
      input_type: mock
      config:
        iface: eth0
  policies:
    # policy name and description
    default_view:
      kind: collection
      input:
        tap: anycast
        input_type: mock
      handlers:
        modules:
          default_net:
            type: net
          default_dns:
            type: dns
      sink:
        type: sqlite
        config:
          file: metric_output.db

I propose we add the high level concept of a BucketSink to represent logic that can receive an arbitrary AbstractMetricsBucket object per time period and write it out. It should define an abstract interface such as sink_bucket(AbstractMetricsBucket *bucket), and a new concrete class such as SqliteBucketSink which derives BucketSink should extend it and implement the save logic.

When a bucket sink is set for a Policy, the policy will instantiate the appropriate BucketSink (in this case SqliteBucketSink) and call a new method StreamMetricsHandler::set_bucket_sink(sink) for every handler in the policy.

The BucketSink class will then receive an AbstractMetricsBucket* per time period, so it has access to these methods:

  • to_prometheus
  • to_json
  • period_length
  • start_tstamp
  • end_tstamp

The implementation of sink_bucket can therefore use these methods to extract the bucket information and right it out to the data store.

For the implementation of SqliteBucketSink I recommend we create an sqlite database with the following columns:

timestamp, metric, labels, value

The sink will call to_prometheus and split each metric into 3 parts: packets_top_ipv4{ipv4="142.251.128.78",module="default-net",policy="default"} 157 into

  1. packets_top_ipv4
  2. {ipv4="142.251.128.78",module="default-net",policy="default"}
  3. 157

Then:

  1. timestamp will be filled with bucket->end_tstamp()
  2. metric will be part 1 above
  3. labels will be part 2 above (converted to JSON format, adding quotes on label names, changing = to :)
  4. value will be part 3 above

The final result will be an sqlite DB that contains Prometheus like format data, and can be use for immediate analysis using sqlite functionality, including matching labels with sqlite json functions.

Example sqlite session:

sqlite> .schema metrics
CREATE TABLE metrics (tstamp integer,metric text,labels text,value integer);
sqlite> select * from metrics;
1649785581|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|157
1649785681|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|200
1649785981|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|300
sqlite> select json_extract(metrics.labels, '$.policy') from metrics where json_extract(metrics.labels, '$.ipv4') == "142.251.128.78";
default
default
default
sqlite> select avg(value) from metrics where json_extract(metrics.labels, '$.policy') == 'default';
219.0

weyrick avatar Apr 12 '22 17:04 weyrick