vector icon indicating copy to clipboard operation
vector copied to clipboard

Network Flow Handling

Open sempervictus opened this issue 4 years ago • 13 comments

Use-cases

Network devices such as switches and firewalls export sFlow/NetFLow (or the perverse vendor variants) to provide ongoing trace of network traffic details without having to store and capture all of the data involved. Tools like ElastiFlow utilize LogStash to ingest and enrich flow data with DNS and AS lookups, GeoIP information, etc. Vector seems like a much better platform for the ingest, enrichment, and forwarding given its memory and concurrency models, as well as portability across systems for localized collection.

Proposal

  1. Implement ingest for the common flow formats: sFlow, NetFlow, IPFix, and JFlow - data and counters.
  2. Implement internal formatting
  3. Implement attribute-based lookups and enrichment
  4. Implement output formatting - ElastiFlow already provides an ES template

sempervictus avatar May 09 '21 06:05 sempervictus

Thanks for filing @sempervictus, we'll be addressing shortly!

zsherman avatar Dec 06 '21 04:12 zsherman

@zsherman: thanks a ton, utterly buried and can't get to it myself for a while.

sempervictus avatar Dec 06 '21 13:12 sempervictus

@sempervictus @zsherman Was there any traction on this? I'm looking to use vector to process SFLOW data to replace our Filebeat collectors.

gaby avatar Apr 28 '23 00:04 gaby

@gaby: I had started collecting netflow and sflow processing libraries and stubbing out standalone code to test functionality - unfortunately i have relatively little time available for FOSS right now and had to divert effort. It seems pretty viable, but likely a week or two of work to get to production state by someone who knows their way around Rust and Vector (flow processors tend to have some churn as various vendor attributes are requested by users). @zsherman: any chance the development team might have cycles to tackle the use case? Seems i'm not the only one interested in using Vector for these data flows. Thanks

sempervictus avatar Apr 28 '23 15:04 sempervictus

We agree this would be a nice use case, but it not currently in our planned work.

bruceg avatar Apr 28 '23 20:04 bruceg

@sempervictus What caught my attention is that Datadog already has a Netflow collector through their agent. https://docs.datadoghq.com/network_monitoring/devices/netflow/

I can send data Netflow/Sflow data to Vector through a Socket Sink, but there's no way to parse. Maybe having a Netflow/Flow parser/decoder in the transform area would be a good start.

gaby avatar Apr 28 '23 22:04 gaby

any news on this?

MoWaheidi avatar Jun 19 '24 02:06 MoWaheidi

Have not had time to implement or do much prototyping though at this point thinking a deku slicer/extractor (or binrw) which allows for framing the protocol packets to extract would be the way to go.

sempervictus avatar Jun 19 '24 11:06 sempervictus

@sempervictus Is that something that could be done with VRL?

TCP/UDP/Socket Source Parse/Format into SFLOW/Netflow with VRL

gaby avatar Jun 19 '24 13:06 gaby

Its a binary format, so field-extractions specific to the protocols (and their versions) would be preferable IMO.

sempervictus avatar Jun 19 '24 16:06 sempervictus

@sempervictus Havent tested this, but I asked GPT4o how I could parse/format netflow using VRL and it came up with this for v5:

[transforms.parse_netflow]
type = "remap"
inputs = ["tcp"]
source = """
.netflow_data = {
    "version": to_int!(substring(.message, 0, 2)),
    "count": to_int!(substring(.message, 2, 4)),
    "sys_uptime": to_int!(substring(.message, 4, 8)),
    "unix_secs": to_int!(substring(.message, 8, 12)),
    "unix_nsecs": to_int!(substring(.message, 12, 16)),
    "flow_sequence": to_int!(substring(.message, 16, 20)),
    "engine_type": to_int!(substring(.message, 20, 21)),
    "engine_id": to_int!(substring(.message, 21, 22)),
    "sampling_interval": to_int!(substring(.message, 22, 24)),
    "records": []
}

.record_offset = 24
while .record_offset < byte_length(.message) {
    .record = {
        "srcaddr": ipv4_to_str!(substring(.message, .record_offset, .record_offset + 4)),
        "dstaddr": ipv4_to_str!(substring(.message, .record_offset + 4, .record_offset + 8)),
        "nexthop": ipv4_to_str!(substring(.message, .record_offset + 8, .record_offset + 12)),
        "input": to_int!(substring(.message, .record_offset + 12, .record_offset + 14)),
        "output": to_int!(substring(.message, .record_offset + 14, .record_offset + 16)),
        "dPkts": to_int!(substring(.message, .record_offset + 16, .record_offset + 20)),
        "dOctets": to_int!(substring(.message, .record_offset + 20, .record_offset + 24)),
        "first": to_int!(substring(.message, .record_offset + 24, .record_offset + 28)),
        "last": to_int!(substring(.message, .record_offset + 28, .record_offset + 32)),
        "srcport": to_int!(substring(.message, .record_offset + 32, .record_offset + 34)),
        "dstport": to_int!(substring(.message, .record_offset + 34, .record_offset + 36)),
        "pad1": to_int!(substring(.message, .record_offset + 36, .record_offset + 37)),
        "tcp_flags": to_int!(substring(.message, .record_offset + 37, .record_offset + 38)),
        "prot": to_int!(substring(.message, .record_offset + 38, .record_offset + 39)),
        "tos": to_int!(substring(.message, .record_offset + 39, .record_offset + 40)),
        "src_as": to_int!(substring(.message, .record_offset + 40, .record_offset + 42)),
        "dst_as": to_int!(substring(.message, .record_offset + 42, .record_offset + 44)),
        "src_mask": to_int!(substring(.message, .record_offset + 44, .record_offset + 45)),
        "dst_mask": to_int!(substring(.message, .record_offset + 45, .record_offset + 46)),
        "pad2": to_int!(substring(.message, .record_offset + 46, .record_offset + 48))
    }
    .netflow_data.records << .record
    .record_offset = .record_offset + 48
}
"""

It this works, it might be a path into integrating it with VRL.

gaby avatar Jun 19 '24 16:06 gaby

If we can do dependent offsets (if field A is greater than 5 then field F will exist, otherwise field E will extend into where we expect F to fall if its 5 or below) then that might be an approach we can take. Definitely worth investigating

sempervictus avatar Jun 19 '24 17:06 sempervictus

This is much closer, still needs some work: `# Define the protocol map .protocol_map = { "1": "ICMP", "6": "TCP", "17": "UDP" }

Ensure .prot is extracted from the message correctly

.prot_hex, prot_err = slice(.message, 69, 70)

if prot_err == null { .prot = parse_int(.prot_hex, base: 16) ?? 0 .prot_str = to_string(.prot)

# Fetch the protocol name from the protocol map
.protocol_name = get([.protocol_map], [.prot_str]) ?? "Unknown"

} else { .protocol_name = "EXTRACTION_ERROR" }

Extract the TCP flags from the Netflow v5 message

.tcp_flags_hex, extract_err = slice(.message, 122, 124)

if extract_err == null { .tcp_flags = parse_int(.tcp_flags_hex, base: 16) ?? 0

.tcp_flags_parsed = []

if .tcp_flags >= 32 { .tcp_flags_parsed = push(.tcp_flags_parsed, "URG"); .tcp_flags = .tcp_flags - 32 }
if .tcp_flags >= 16 { .tcp_flags_parsed = push(.tcp_flags_parsed, "ACK"); .tcp_flags = .tcp_flags - 16 }
if .tcp_flags >= 8 { .tcp_flags_parsed = push(.tcp_flags_parsed, "PSH"); .tcp_flags = .tcp_flags - 8 }
if .tcp_flags >= 4 { .tcp_flags_parsed = push(.tcp_flags_parsed, "RST"); .tcp_flags = .tcp_flags - 4 }
if .tcp_flags >= 2 { .tcp_flags_parsed = push(.tcp_flags_parsed, "SYN"); .tcp_flags = .tcp_flags - 2 }
if .tcp_flags >= 1 { .tcp_flags_parsed = push(.tcp_flags_parsed, "FIN") }

if length(.tcp_flags_parsed) == 0 {
    .tcp_flags_parsed = "NO_FLAGS"
} else {
    .tcp_flags_parsed = join!(.tcp_flags_parsed, ",")
}

} else { .tcp_flags_parsed = "EXTRACTION_ERROR" }

Calculate duration in milliseconds

.first = parse_int(.first_bytes, 16) ?? 0 .last = parse_int(.last_bytes, 16) ?? 0 .duration = .last - .first

Extract other fields from the message

.version_bytes, version_err = slice(.message, 0, 2) .count_bytes, count_err = slice(.message, 2, 4) .sys_uptime_bytes, sys_uptime_err = slice(.message, 4, 8) .unix_secs_bytes, unix_secs_err = slice(.message, 8, 12) .unix_nsecs_bytes, unix_nsecs_err = slice(.message, 12, 16) .flow_sequence_bytes, flow_sequence_err = slice(.message, 16, 20) .engine_type_bytes, engine_type_err = slice(.message, 20, 21) .engine_id_bytes, engine_id_err = slice(.message, 21, 22) .sampling_interval_bytes, sampling_interval_err = slice(.message, 22, 24) .srcaddr_bytes, srcaddr_err = slice(.message, 24, 28) .dstaddr_bytes, dstaddr_err = slice(.message, 28, 32) .nexthop_bytes, nexthop_err = slice(.message, 32, 36) .input_bytes, input_err = slice(.message, 36, 38) .output_bytes, output_err = slice(.message, 38, 40) .dPkts_bytes, dPkts_err = slice(.message, 40, 44) .dOctets_bytes, dOctets_err = slice(.message, 44, 48) .srcport_bytes, srcport_err = slice(.message, 56, 58) .dstport_bytes, dstport_err = slice(.message, 58, 60) .pad1_bytes, pad1_err = slice(.message, 60, 61) .tcp_flags_bytes, tcp_flags_err = slice(.message, 61, 62) .prot_bytes, prot_err = slice(.message, 62, 63) .tos_bytes, tos_err = slice(.message, 63, 64) .src_as_bytes, src_as_err = slice(.message, 64, 66) .dst_as_bytes, dst_as_err = slice(.message, 66, 68) .src_mask_bytes, src_mask_err = slice(.message, 68, 69) .dst_mask_bytes, dst_mask_err = slice(.message, 69, 70) .pad2_bytes, pad2_err = slice(.message, 70, 72)

.version = if version_err != null { -1 } else { parse_int(.version_bytes, 16) ?? -1 } .count = if count_err != null { -1 } else { parse_int(.count_bytes, 16) ?? -1 } .sys_uptime = if sys_uptime_err != null { -1 } else { parse_int(.sys_uptime_bytes, 16) ?? -1 } .unix_secs = if unix_secs_err != null { -1 } else { parse_int(.unix_secs_bytes, 16) ?? -1 } .unix_nsecs = if unix_nsecs_err != null { -1 } else { parse_int(.unix_nsecs_bytes, 16) ?? -1 } .flow_sequence = if flow_sequence_err != null { -1 } else { parse_int(.flow_sequence_bytes, 16) ?? -1 } .engine_type = if engine_type_err != null { -1 } else { parse_int(.engine_type_bytes, 16) ?? -1 } .engine_id = if engine_id_err != null { -1 } else { parse_int(.engine_id_bytes, 16) ?? -1 } .sampling_interval = if sampling_interval_err != null { -1 } else { parse_int(.sampling_interval_bytes, 16) ?? -1 }

.srcaddr = if srcaddr_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.srcaddr_bytes, 0, 1) .part2_bytes, part2_err = slice(.srcaddr_bytes, 1, 2) .part3_bytes, part3_err = slice(.srcaddr_bytes, 2, 3) .part4_bytes, part4_err = slice(.srcaddr_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }

.dstaddr = if dstaddr_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.dstaddr_bytes, 0, 1) .part2_bytes, part2_err = slice(.dstaddr_bytes, 1, 2) .part3_bytes, part3_err = slice(.dstaddr_bytes, 2, 3) .part4_bytes, part4_err = slice(.dstaddr_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }

.nexthop = if nexthop_err != null { "0.0.0.0" } else { .part1_bytes, part1_err = slice(.nexthop_bytes, 0, 1) .part2_bytes, part2_err = slice(.nexthop_bytes, 1, 2) .part3_bytes, part3_err = slice(.nexthop_bytes, 2, 3) .part4_bytes, part4_err = slice(.nexthop_bytes, 3, 4) if part1_err != null || part2_err != null || part3_err != null || part4_err != null { "0.0.0.0" } else { .part1 = parse_int(.part1_bytes, 16) ?? 0 .part2 = parse_int(.part2_bytes, 16) ?? 0 .part3 = parse_int(.part3_bytes, 16) ?? 0 .part4 = parse_int(.part4_bytes, 16) ?? 0 to_string(.part1) + "." + to_string(.part2) + "." + to_string(.part3) + "." + to_string(.part4) } }

.input = if input_err != null { -1 } else { parse_int(.input_bytes, 16) ?? -1 } .output = if output_err != null { -1 } else { parse_int(.output_bytes, 16) ?? -1 } .dPkts = if dPkts_err != null { -1 } else { parse_int(.dPkts_bytes, 16) ?? -1 } .dOctets = if dOctets_err != null { -1 } else { parse_int(.dOctets_bytes, 16) ?? -1 } .srcport = if srcport_err != null { -1 } else { parse_int(.srcport_bytes, 16) ?? -1 } .dstport = if dstport_err != null { -1 } else { parse_int(.dstport_bytes, 16) ?? -1 } .pad1 = if pad1_err != null { -1 } else { parse_int(.pad1_bytes, 16) ?? -1 } .tcp_flags = if tcp_flags_err != null { -1 } else { parse_int(.tcp_flags_bytes, 16) ?? -1 } .prot = if prot_err != null { -1 } else { parse_int(.prot_bytes, 16) ?? -1 } .tos = if tos_err != null { -1 } else { parse_int(.tos_bytes, 16) ?? -1 } .src_as = if src_as_err != null { -1 } else { parse_int(.src_as_bytes, 16) ?? -1 } .dst_as = if dst_as_err != null { -1 } else { parse_int(.dst_as_bytes, 16) ?? -1 } .src_mask = if src_mask_err != null { -1 } else { parse_int(.src_mask_bytes, 16) ?? -1 } .dst_mask = if dst_mask_err != null { -1 } else { parse_int(.dst_mask_bytes, 16) ?? -1 } .pad2 = if pad2_err != null { -1 } else { parse_int(.pad2_bytes, 16) ?? -1 }

.netflow_data = { "version": .version, "count": .count, "sys_uptime": .sys_uptime, "unix_secs": .unix_secs, "unix_nsecs": .unix_nsecs, "flow_sequence": .flow_sequence, "engine_type": .engine_type, "engine_id": .engine_id, "sampling_interval": .sampling_interval, "srcaddr": .srcaddr, "dstaddr": .dstaddr, "nexthop": .nexthop, "input": .input, "output": .output, "dPkts": .dPkts, "dOctets": .dOctets, "first": .first, "last": .last, "duration": .duration, "srcport": .srcport, "dstport": .dstport, "pad1": .pad1, "tcp_flags": .tcp_flags, "prot": .prot, "tos": .tos, "src_as": .src_as, "dst_as": .dst_as, "src_mask": .src_mask, "dst_mask": .dst_mask, "pad2": .pad2, }

{ "message": .message, "prot": .prot, "prot_hex": .prot_hex, "prot_str": .prot_str, "protocol_map": .protocol_map, "protocol_name": .protocol_name, "tcp_flags": .tcp_flags, "tcp_flags_hex": .tcp_flags_hex, "tcp_flags_parsed": .tcp_flags_parsed, "duration": .duration, "netflow_data": .netflow_data } `

MoWaheidi avatar Jun 25 '24 19:06 MoWaheidi