Shuffle icon indicating copy to clipboard operation
Shuffle copied to clipboard

Create a Data Pipeline controller

Open frikky opened this issue 1 year ago • 11 comments

There are quite a few tools Shuffle can't natively connect to receive and control information. These are things like:

  • RabbitMQ
  • Syslog
  • Kafka
  • Random monitors

The thought is to use something like Tenzir, deployed in a container on the backend host(s), which would be controlled from Shuffle in order to start workflows from many other sources than currently existing

This would make it so message queues, logs or.. any data can be sent into Shuffle for further API automation

frikky avatar Jan 16 '24 11:01 frikky

Happy to support this effort!

mavam avatar Jan 16 '24 11:01 mavam

Basics of what would be needed on the Shuffle side (prototype):

  • [x] A Docker container that we can control what runs based on command line inputs
  • [x] Make it forward and start workflows in Shuffle (somehow)
  • [x] A way to shut this container down/spin new ones up easily

For production:

  • [x] Make it possible to control the commands it runs from the UI itself (and add standards for e.g. syslog). Probably a new trigger in the Shuffle UI
  • [x] (Maybe) run it from Orborus, and not from the backend directly
  • [ ] Run it as a sample SIEM with rulesets from Shuffle's side

frikky avatar Feb 07 '24 16:02 frikky

First usecases work! How it works:

  1. New Shuffle API at POST /api/v1/triggers/pipeline that
  2. This deploys to an onprem Orborus (queue handler for Shuffle) which starts the container
  3. The node container itself runs a single pipeline, and is started from CMD inputs

Initial goals:

  • [x] Get Shuffle cache data -> forward to Shuffle webhook (HTTP only)
  • [ ] Syslog -> Shuffle workflow
  • [ ] Kafka -> Shuffle workflow
  • [ ] Mount a file locally to handle storage between runs/instances

@mavam may need your help on how to actually write these pipelines :)

Quick trigger frontend for triggers (only visible to support users): image

frikky avatar Feb 25 '24 03:02 frikky

When you say → Shuffle Workflow, what are the requirements? Is this a HTTP POST request? Our http connector makes it possible to stuff events into the request body, if that's what you are looking for.

The basic pattern looks like this:

load tcp://0.0.0.0:514
| read syslog
| to http://api.com X-Token:Secret

With Kafka:

from kafka://1.2.3.4 --topic foo
| to http://api.com X-Token:Secret

(The default format for kafka is JSON, which is why you don't have to specify it.)

Also, for RabbitMQ, check out the amqp connector.

mavam avatar Feb 25 '24 08:02 mavam

When you say → Shuffle Workflow, what are the requirements? Is this a HTTP POST request? Our http connector makes it possible to stuff events into the request body, if that's what you are looking for.

The basic pattern looks like this:

load tcp://0.0.0.0:514
| read syslog
| to http://api.com X-Token:Secret

With Kafka:

from kafka://1.2.3.4 --topic foo
| to http://api.com X-Token:Secret

(The default format for kafka is JSON, which is why you don't have to specify it.)

Also, for RabbitMQ, check out the amqp connector.

Cheers, I'm starting to get the format! I'll make this work

frikky avatar Feb 25 '24 14:02 frikky

@mavam I drew up some basic architecture for it, and will need some help in understanding a couple of the storage and pipeline management basics. Also changes a few things in the GUI to make it "By Tenzir" for when we can actually release this 👍

Questions:

  • In the image (AKA should ONE pipeline do the storage, while another checks Sigma rule(s)? The Kafka one seems easy enough.
  • https://discord.com/channels/1072605116978442320/1072605117817290784/1211803473859842119

Left side: Shuffle controlled Right side: Tenzir pipeline relevant things~ image

frikky avatar Feb 27 '24 00:02 frikky

Looks great!

A few points about the Tenzir boxes:

  • Sigma: The sigma operator watches a directory for new rules. When you add or remove a rule, it will update it's internal list of rules. If you run a in container, you could mount your rules at a specific location.
  • Kafka/MQTT: The kafka connector is our native implementation (using librdkafka). MQTT support is currently only available via fluent-bit (input plugin)
  • Syslog Listener: If you want to accept via Syslog, you would combine a loader with the syslog parser. For example, load tcp://0.0.0.0:514 | read syslog for a two-step, or alternatively in one operator from tcp://0.0.0.0:514 read syslog. (NB: @jachris, mind chiming on how this looks like with TQL v2.0?)
  • Storage: this something you mount in. The state directory keeps all the data. See also the example configuration.

mavam avatar Feb 27 '24 08:02 mavam

Now regarding your use case of sending Sigma rule matches as POST requests. At a high-level, you'd want to deploy something like this:

from <somewhere>
| sigma /path/to/rules
| to https://webhook.example.com Token:Secret

The Syslog import would be:

from tcp://0.0.0.0:514
| import

mavam avatar Feb 27 '24 08:02 mavam

I'm excited to see where this is going! :rocket:

@mavam:

  • Syslog Listener: If you want to accept via Syslog, you would combine a loader with the syslog parser. For example, load tcp://0.0.0.0:514 | read syslog for a two-step, or alternatively in one operator from tcp://0.0.0.0:514 read syslog. (NB: @jachris, mind chiming on how this looks like with TQL v2.0?)

Sure. To add some context: We are working on a major overhaul of our language, which will significantly improve its expressiveness, and also increase the syntactic cohesion between operators. However, for the given example, not much changes, except that from tcp://0.0.0.0:514 read syslog is split into separate operators: from tcp://0.0.0.0:514 | read syslog. (This assumes that we add URL literals, otherwise, you will have to write from "tcp://0.0.0.0:514").

Note: While the general syntax and semantics are already pretty defined, this is still a bit under discussion. The above reflects the current draft, but we are also considering to require using load if you want to manually specify a format (which should be not too common).

jachris avatar Feb 27 '24 11:02 jachris

Now regarding your use case of sending Sigma rule matches as POST requests. At a high-level, you'd want to deploy something like this:

from <somewhere>
| sigma /path/to/rules
| to https://webhook.example.com Token:Secret

The Syslog import would be:

from tcp://0.0.0.0:514
| import

In the "from |", it could also use "import |" I would assume, as to make check from storage? Is it also possible to use Sigma rules in realtime, and have multiple "outputs", AKA check Sigma rules in realtime AND export in the same one? If so, here is an updated diagram. As for Kafka/Message queues, these are just pure forwarders for simplicity right now.

I also saw that you had the "every X |" operator on the way to handle scheduling. This would help a lot here, due to how the Sigma rule checker should run. Any time estimate? :)

image

frikky avatar Feb 27 '24 11:02 frikky

In the "from |", it could also use "import |" I would assume, as to make check from storage?

The import operator is a sink (can only occur at the end of the pipeline). If you would like to reference the stored data with another pipeline, use the export source (can only occur at the beginning of a pipeline). This diagram illustrates how import and export work:

image

Is it also possible to use Sigma rules in realtime, and have multiple "outputs", AKA check Sigma rules in realtime AND export in the same one?

You're asking hard (but good!) questions 😉. For standard IoCs/observables, there's the lookup operator. This is a combined live and retro check, but it leverages the contextualization framework. There is no "Sigma context" yet, but we have several users that requested this already.

As a workaround today, you would simply use the sigma operator twice:

  1. Live stream: export --live | sigma /path/to/rules | to <sink>
  2. Retro match: export | sigma /path/to/rules | to <sink>

The lookup operator automates these two steps, triggering a new instance of (2) when a context update occurs.

I also saw that you had the "every X |" operator on the way to handle scheduling. This would help a lot here, due to how the Sigma rule checker should run. Any time estimate? :)

I have to defer to @dominiklohmann to give you a rough estimate.

mavam avatar Feb 27 '24 12:02 mavam