pktvisor-reader should process multiple pcap files and output multiple metric buckets
Current pktvisor-reader works by taking a single input file and producing a single output bucket.
There is a use case to process multiple pcap files at once, for example, a series of smaller pcap files captured in series. In this case the user would like to produce JSON summaries as if the data had been live.
To enable this use case, pktvisor-reader needs:
- The ability to have multiple files specified on the command line OR the ability to ingest a raw series of bytes via file description
- The ability to produce output metrics (JSON format) per policy per bucket as the pcap processing progresses. This requires a strategy for saving multiple buckets to a single output file, or multiple output files to a directory.
Here follows a refinement of step 2, including details on hooking into the metric buckets per period and allowing for saving to sqlite database instead of JSON text files, which are hard to process.
The proper place to hook into the buckets during processing is during the AbstractMetricsManager::on_period_shift() event. This is called for every time window shift, immediately after the latest live bucket shifts to the first non-live bucket.
Since every handler in every policy (because it extends StreamMetricsHandler) will have separate on_period_shift events, we need a way to hook into all of these at the Policy level.
Therefore, we should be able to set a way to sink all buckets for all handlers in a Policy by including policy configuration. For example:
visor:
taps:
anycast:
input_type: mock
config:
iface: eth0
policies:
# policy name and description
default_view:
kind: collection
input:
tap: anycast
input_type: mock
handlers:
modules:
default_net:
type: net
default_dns:
type: dns
sink:
type: sqlite
config:
file: metric_output.db
I propose we add the high level concept of a BucketSink to represent logic that can receive an arbitrary AbstractMetricsBucket object per time period and write it out. It should define an abstract interface such as sink_bucket(AbstractMetricsBucket *bucket), and a new concrete class such as SqliteBucketSink which derives BucketSink should extend it and implement the save logic.
When a bucket sink is set for a Policy, the policy will instantiate the appropriate BucketSink (in this case SqliteBucketSink) and call a new method StreamMetricsHandler::set_bucket_sink(sink) for every handler in the policy.
The BucketSink class will then receive an AbstractMetricsBucket* per time period, so it has access to these methods:
to_prometheusto_jsonperiod_lengthstart_tstampend_tstamp
The implementation of sink_bucket can therefore use these methods to extract the bucket information and right it out to the data store.
For the implementation of SqliteBucketSink I recommend we create an sqlite database with the following columns:
timestamp, metric, labels, value
The sink will call to_prometheus and split each metric into 3 parts:
packets_top_ipv4{ipv4="142.251.128.78",module="default-net",policy="default"} 157
into
packets_top_ipv4{ipv4="142.251.128.78",module="default-net",policy="default"}- 157
Then:
timestampwill be filled withbucket->end_tstamp()metricwill be part 1 abovelabelswill be part 2 above (converted to JSON format, adding quotes on label names, changing = to :)valuewill be part 3 above
The final result will be an sqlite DB that contains Prometheus like format data, and can be use for immediate analysis using sqlite functionality, including matching labels with sqlite json functions.
Example sqlite session:
sqlite> .schema metrics
CREATE TABLE metrics (tstamp integer,metric text,labels text,value integer);
sqlite> select * from metrics;
1649785581|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|157
1649785681|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|200
1649785981|packets_top_ipv4|{"ipv4":"142.251.128.78","module":"default-net","policy":"default"}|300
sqlite> select json_extract(metrics.labels, '$.policy') from metrics where json_extract(metrics.labels, '$.ipv4') == "142.251.128.78";
default
default
default
sqlite> select avg(value) from metrics where json_extract(metrics.labels, '$.policy') == 'default';
219.0