intelmq icon indicating copy to clipboard operation
intelmq copied to clipboard

Multiprocessing per queue is not supported

Open adulau opened this issue 9 years ago • 27 comments

Multiprocessing per queue is not supported. If you have to process a huge dataset and only a single process is handling a queue, you are time bound with DNS resolution or alike. It would be nice to ensure that you can handle multiple processes for handling a single queue.

adulau avatar May 28 '15 13:05 adulau

FYI, it is supported on AIL by this module: https://github.com/CIRCL/AIL-framework/blob/master/bin/Helper.py

I will see if it is possible to implement it.

Rafiot avatar May 28 '15 13:05 Rafiot

On May 28, 2015, at 3:52 PM, Alexandre Dulaunoy [email protected] wrote:

Multiprocessing per queue is not supported. If you have to process a huge dataset and only a single process is handling a queue, you are time bound with DNS resolution or alike. It would be nice to ensure that you can handle multiple processes for handling a single queue.

— Reply to this email directly or view it on GitHub.

So concerning DNS in particular, we had a discussion on this already. Tomas tried to implement threading here, but it turned out to be a bit awkward and ugly (a.k.a. complex).

So, we decided against this and reverted to processes and/or asynchronous DNS. In fact, I'd like to see something like adns-tools for DNS lookups.

However, right now (at least for us) speed was not the main concern.

Any experience with adns in python?

aaronkaplan avatar May 28 '15 14:05 aaronkaplan

Protip: never use threading in python. Been here, done that, bad idea.

My approach on AIL is to run processes poping values from a set and pushing the output on the next one.

I will dig into the code of intelmq to see if I can use the same library as in AIL.

Rafiot avatar May 28 '15 14:05 Rafiot

The new version that is under testing (https://github.com/certtools/intelmq-beta) is possible to enable 'load_balance' which is in the reality a splitte.

https://github.com/certtools/intelmq-beta/blob/master/intelmq/lib/pipeline.py#L82

What do you think about the idea? Is not multi-thread, is "muti-process".

SYNchroACK avatar May 28 '15 14:05 SYNchroACK

On May 28, 2015, at 4:35 PM, Raphaël Vinot [email protected] wrote:

Protip: never use threading in python. Been here, done that, bad idea.

ACK :)

My approach on AIL is to run processes poping values from a set and pushing the output on the next one.

I will dig into the code of intelmq to see if I can use the same library as in AIL.

okay. Cool! Thx a lot.

— Reply to this email directly or view it on GitHub.

aaronkaplan avatar May 28 '15 14:05 aaronkaplan

Parsers will have the load_balance option equal True, so they will send one event to cymru-1 and second one to cymru-2, and so on...

PS: manager graph will need the have a hierarchy structured to be more understandable.

Visualization of idea:

pic

SYNchroACK avatar May 28 '15 14:05 SYNchroACK

I strongly dislike that we create loops in the graph. Even if it is just loops in the underlying undirected graph (in your example, there are no cycles in the DAG - but the underlying undirected one has cycles).

aaronkaplan avatar May 28 '15 15:05 aaronkaplan

The best is to simplify such approach. In AIL for example, multiple processes can read from the same queue or publisher. That's it. So depending of the resource of the machine you can start one or more processes per queue/publisher.

adulau avatar May 28 '15 15:05 adulau

@aaronkaplan check the code and propose. Its a little bit hard to get a simple solution like the current one.

SYNchroACK avatar May 28 '15 15:05 SYNchroACK

On May 28, 2015, at 5:17 PM, Alexandre Dulaunoy [email protected] wrote:

The best is to simplify such approach. In AIL for example, multiple processes can read from the same queue or publisher. That's it. So depending of the resource of the machine you can start one or more processes per queue/publisher.

Agreed. That makes sense.

— Reply to this email directly or view it on GitHub.

aaronkaplan avatar May 28 '15 15:05 aaronkaplan

Here is my model for one queue in AIL module

You can have multiple Queues subscribing to the same publisher.

Rafiot avatar May 28 '15 15:05 Rafiot

+1 pub / sub seems a good way . Are pushes and pops atomic in redis?

aaronkaplan avatar May 28 '15 15:05 aaronkaplan

Yes.

adulau avatar May 28 '15 15:05 adulau

Let me maybe elaborate why I think the solution by @Rafiot seems better for me: because you can't make loops. Loops can bite us a lot if we are not careful to mark all the data which has already been processed. So what I do not yet understand in @Rafiot 's picture: if the data goes into the Set , is the pop atomic as well even for a whole message/event (seems like yes, because a list is a certain type of set (note: I am not a Redis guru))? Secondly, have we thought about using more of the pub/sub model of Redis in pushing and popping ? Is that what's actually implied in the picture? Third, how would you re-arrange the base bot class so that it follows this principle? What features does an abstract bot class need?

--> maybe some of these questions are answered by looking at the AIL source :)

aaronkaplan avatar May 28 '15 16:05 aaronkaplan

With my solution, you can make loops, which is a feature and not a bug because I fetch onions and process them again :) Obviously, it only work if you use redis, and not ZMQ because you can't have multiple publisher on one one context.

In AIL, the 'message' is the path to the paste on the disk. In intelmq, it should be the json representation of one event (serialized).

tl;dr: QueueIn and QueueOut will serialize and deserialize an event properly so any of them can be processed independently by the multiple Module scripts. This part is hidden and nobody creating a module should have to deal with that complexity (that's the case in AIL).

Rafiot avatar May 28 '15 20:05 Rafiot

@aaronkaplan I think we miss something in the picture. Look carefully to arrows and you will see: collector -> parser -> experts -> output

To be clear, new image: newpic

SYNchroACK avatar May 29 '15 10:05 SYNchroACK

yes, I saw that. :) The DAG (https://en.wikipedia.org/wiki/Directed_acyclic_graph) is a DAG :) Well, acyclical. But the underlying undirected graph has cycles. It hurts my eyes. But I know right now it does not harm the processing :)

But yes, I agree your approach would work as well of course.

To be maybe clearer: I prefer one cymru bot which can do things in parallel (internally! for example with asynchronous DNS) and show up as one single box in the display. Or you hide the internal multiple parallel processes in the GUI.

aaronkaplan avatar May 29 '15 12:05 aaronkaplan

The schema I did yesterday is for one queue.

in my terminology, Alienvault-collector is one queue, alienvault-parser is one queue, cymru-to-asn is one queue.

it doesn't make any sense to have multiple alienvault parser so we would not have multiple processes but the parser could be started multiple times, on multiple files and obviously cymru-to-asn will be started multiple times

Rafiot avatar May 29 '15 14:05 Rafiot

Here is the very initial version of the queuing code I'm porting to a standalone module from AIL: https://github.com/Rafiot/MultiProcQueue

It is quite far from being ready, but most of the logic in there.

Rafiot avatar Jun 01 '15 23:06 Rafiot

The code is almost ready, I use the same concepts as IntelMQ to make it an (easy enough) drop-in replacement. I will create a branch in IntelMQ for testing ASAP.

In theory the following should be enough:

  • Use the Pipeline class from MultiProcQueue instead of the one of IntelMQ in Bot
  • Launch launch_queues.py and launch_modules.py

We will see in practice... :)

Rafiot avatar Jun 09 '15 16:06 Rafiot

Short update: I'm going to update my code to use disque[1] instead of my custom queuing system: it seems much more reliable, is properly supported, and will much more easily give the possibility to use multiprocessing.

[1] https://github.com/antirez/disque

Rafiot avatar Jul 31 '15 10:07 Rafiot

From the disque repo:

WARNING: This is alpha code NOT suitable for production. The implementation and API will likely
change in significant ways during the next months. The code and algorithms are not tested enough. A
lot more work is needed.

What do you think? Which one is more reliable?

adulau avatar Jul 31 '15 13:07 adulau

mine is, I think, more reliable for now. But also not used in production either so I cannot really tell. I really want to give a try to disque, because it would take care of the whole subscriber to set and set to publisher code, which is something I would rather enjoy.

Rafiot avatar Jul 31 '15 14:07 Rafiot

Here is the testing code I wrote: https://github.com/Rafiot/MultiProcQueue/tree/master/example_disque

+:

  • very, very short
  • all the logic goes to Disque
  • reduce drastically the code to maintain

-:

  • not possible to choose the type of output queue (it would add a lot of complexity in the code)

Rafiot avatar Aug 02 '15 10:08 Rafiot

Hi,

because we need to process thousands of messages and use remote calls in experts, multiprocessing is necessary. But I think multiprocessing thought load_balance is broken by design because you must define new bot, copy configuration in runtime config file and define new pipelines. And if you find out, that for example 2 processes are not enough, you must change the whole configuration again.

My proposal is really simple: use only one source and destination queue, but every process uses only own internal queue. This is possible because Redis command BRPOPLPUSH used in receive method is atomic, so there is not a problem with concurrency.

With a different internal queue, it is possible to run bot multiple times or use Python threading.

What do you think about this solution?

ondj avatar May 10 '16 11:05 ondj

@wagner-certat is this the case by now? each bot an internal queue?

aaronkaplan avatar Sep 08 '20 17:09 aaronkaplan

internal queues are only necessary for the redis MQ, we do not use & need them for the AMQP MQ. This is also the reason why multithreading is only implemented for AMQP and not Redis - to implement it for redis would mean high efforts as we would need to implement more MQ functionality ourselves.

I'd be fine if - in case we implement multiprocessing - the feature is not available for all message queues (as with mutlithreading).

ghost avatar Sep 09 '20 20:09 ghost