briskstream icon indicating copy to clipboard operation
briskstream copied to clipboard

Question about spout and keyby distributions

Open mencagli opened this issue 5 years ago • 13 comments

Hi Tony,

Is it possible in BriskStream to distribute tuples from the source to the next operator in a key-by basis (fieldsgrouping)? To be clearer, suppose an application starting with:

Spout->Operator

where each tuple from the source is delivered to the right replica of the operator based on a key attribute. As far as I understand, all the applications in your repository start with a Spout followed by a parser operator, where the connection between Source and Parser is always shuffledgrouping while the one between the Parser and the next operator can be shuffledgrouping or fieldsgrouping. So, I am wondering if it is possible to bypass the Parser. Unfortunately, the superclass AbstractSpout does not seem to have a getDefaultFields method. So, maybe this feature is not currently provided.

Thanks!

mencagli avatar Feb 10 '20 18:02 mencagli

Hi mencagli,

  1. fieldsgrouping is supported, as you can reference to WordCount.java to how to declare it. It is almost the same as Storm, but do take note the slight difference.

, new FieldsGrouping(Component.SPLITTER, new Fields(Field.WORD))

  1. Put things short first, it is by design that there has to be a Parser followed by Spout.

I use Parser operator to shift the workload of Spout so that Spout has nothing to do but keep emitting next tuple to keep the system busy. This ensures that different applications will have the same maximum rate of input (if the subsequent pipelines can handle it). Logically, the Parser is hence the usual ``Spout" of an application, and the Spout of BriskStream is nothing but a data generator run at maximum speed.

Hope it clarifies.

ShuhaoZhangTony avatar Feb 10 '20 18:02 ShuhaoZhangTony

Thanks! So, you can confirm me that it is possible to connect directly the spout with the next operator using fieldsgrouping. I know that fieldsgrouping is supported in BriskStream. My precise question (it it was not clear) is to known whether the spout can emit directly with fieldgrouping to the next operator without having a parser in the middle (suppose I am doing this in FraudDetection where I want to connect the Spout with the Predictor directly without the parser and using fieldgrouping to respect the application semantics).

So if it is possible thanks, and thanks also for the explanation about the presence of a parser. It is reasonable.

Gabriele

mencagli avatar Feb 10 '20 19:02 mencagli

Technically yes, but I didn't write the corresponding code to allow Spout to connect next operator with FieldsGrouping (always assume shuffle grouping).

As I just mentioned, I always assume Spout is connected directly to Parser, which acts as the logical Spout of an application.

If you seriously need to have spout to directly connect to any operators with FieldsGrouping, I can update BriskStream to support it, it probably requires just few lines of code.

ShuhaoZhangTony avatar Feb 10 '20 20:02 ShuhaoZhangTony

Dear Tony,

if you can do that with little effort, it would be very very appreciated from my side. I need to remove the parser from my briskstream benchmark and having it is a waste of threads in my case. If you can modify the source code and tell me which part of the source code need to be updated, it would be perfect!

Many many thanks for your prompt reply and great help!

Gabriele

mencagli avatar Feb 10 '20 20:02 mencagli

Dear Gabriele,

I just checked the code, it is already supported. For illustration, I modified FaultDetection.java so that Spout is connected to Parser using FieldsGrouping. Of course, you can then by-pass Parser by other operators similarly.

Tony

ShuhaoZhangTony avatar Feb 11 '20 08:02 ShuhaoZhangTony

Wondeful, thanks! I will try immediately.

mencagli avatar Feb 11 '20 09:02 mencagli

Dear Tony,

everything seems to work, thanks! I am wondering if you have a close/cancel method in the spouts, which will be called before terminating like in Flink and Storm. This would be very helpful to collect statistics. Thanks again, Gabriele

mencagli avatar Feb 11 '20 15:02 mencagli

Unfortunately, there is currently no such feature built in BriskStream. You may want to implement it in application itself, say, call ``system.exit()" when the Spout receive a special tuple.

ShuhaoZhangTony avatar Feb 11 '20 15:02 ShuhaoZhangTony

Ok, thanks for the suggestion Tony.

mencagli avatar Feb 11 '20 17:02 mencagli

Hi Tony,

sorry for bothering you. I have a simple question that might have a fast answer I hope. Is it possible in BriskStream to set the size of the message buffers used by the ExecutorNode to exchange tuples with each other? I imagine the API does not allow this, but maybe you can give me some hints about where I can modify this parameter in the source code.

Many thanks for your help,

Gabriele

mencagli avatar Feb 13 '20 16:02 mencagli

Hi Gabriele,

Currently, BriskStream relies on ``-bt" to tune the number of tuples being transmitted at once between executors. That is to say, the size of the message buffer is (size of tuple) x (batch), where the (batch) is tuneable.

More fine-grained control of the size of the message buffer is unfortunately not allowed unless you statically define the size of each tuple. This is because of the usage of object reference queues in TStream, i.e., it does not care about the actual physical size of a tuple.

To modify this behaviour, you could try to use P1C1OffHeapQueue (located at package brisk.queue.impl), which forces the system to use a statically defined object to pass (say an integer). Then, you can fine-grained control the size of the message buffer (say define the input to the queue as one integer as a 32 bits message buffer, two integers as 64 bits and so on).

Tony

ShuhaoZhangTony avatar Feb 13 '20 16:02 ShuhaoZhangTony

Thanks. I am wondering if there is possibility to choose the size of the object reference queue (Tstream?). For example 100 means that it can hold 100 objects independently from their type (each entry is just a reference). This is actually my goal to set the maximum number of references that can be written in that queue. Maybe now my question could be clearer. Sorry for that.

mencagli avatar Feb 13 '20 16:02 mencagli

I see, so you are talking about the size of the communication queue.

Yes, you can change it. It is located in the executorThread.java. Currently, it is hard-coded as 100000. Check for the following code.

private void allocate_OutputQueue() { // if (enable_latency_measurement) { // executor.allocate_OutputQueue(conf.getBoolean("linked", false), 2);//no queueing delay. // } else { executor.allocate_OutputQueue(conf.getBoolean("linked", false), 100000); // } }

ShuhaoZhangTony avatar Feb 13 '20 17:02 ShuhaoZhangTony