briskstream
briskstream copied to clipboard
Question about spout and keyby distributions
Hi Tony,
Is it possible in BriskStream to distribute tuples from the source to the next operator in a key-by basis (fieldsgrouping)? To be clearer, suppose an application starting with:
Spout->Operator
where each tuple from the source is delivered to the right replica of the operator based on a key attribute. As far as I understand, all the applications in your repository start with a Spout followed by a parser operator, where the connection between Source and Parser is always shuffledgrouping while the one between the Parser and the next operator can be shuffledgrouping or fieldsgrouping. So, I am wondering if it is possible to bypass the Parser. Unfortunately, the superclass AbstractSpout does not seem to have a getDefaultFields method. So, maybe this feature is not currently provided.
Thanks!
Hi mencagli,
- fieldsgrouping is supported, as you can reference to WordCount.java to how to declare it. It is almost the same as Storm, but do take note the slight difference.
, new FieldsGrouping(Component.SPLITTER, new Fields(Field.WORD))
- Put things short first, it is by design that there has to be a Parser followed by Spout.
I use Parser operator to shift the workload of Spout so that Spout has nothing to do but keep emitting next tuple to keep the system busy. This ensures that different applications will have the same maximum rate of input (if the subsequent pipelines can handle it). Logically, the Parser is hence the usual ``Spout" of an application, and the Spout of BriskStream is nothing but a data generator run at maximum speed.
Hope it clarifies.
Thanks! So, you can confirm me that it is possible to connect directly the spout with the next operator using fieldsgrouping. I know that fieldsgrouping is supported in BriskStream. My precise question (it it was not clear) is to known whether the spout can emit directly with fieldgrouping to the next operator without having a parser in the middle (suppose I am doing this in FraudDetection where I want to connect the Spout with the Predictor directly without the parser and using fieldgrouping to respect the application semantics).
So if it is possible thanks, and thanks also for the explanation about the presence of a parser. It is reasonable.
Gabriele
Technically yes, but I didn't write the corresponding code to allow Spout to connect next operator with FieldsGrouping (always assume shuffle grouping).
As I just mentioned, I always assume Spout is connected directly to Parser, which acts as the logical Spout of an application.
If you seriously need to have spout to directly connect to any operators with FieldsGrouping, I can update BriskStream to support it, it probably requires just few lines of code.
Dear Tony,
if you can do that with little effort, it would be very very appreciated from my side. I need to remove the parser from my briskstream benchmark and having it is a waste of threads in my case. If you can modify the source code and tell me which part of the source code need to be updated, it would be perfect!
Many many thanks for your prompt reply and great help!
Gabriele
Dear Gabriele,
I just checked the code, it is already supported. For illustration, I modified FaultDetection.java so that Spout is connected to Parser using FieldsGrouping. Of course, you can then by-pass Parser by other operators similarly.
Tony
Wondeful, thanks! I will try immediately.
Dear Tony,
everything seems to work, thanks! I am wondering if you have a close/cancel method in the spouts, which will be called before terminating like in Flink and Storm. This would be very helpful to collect statistics. Thanks again, Gabriele
Unfortunately, there is currently no such feature built in BriskStream. You may want to implement it in application itself, say, call ``system.exit()" when the Spout receive a special tuple.
Ok, thanks for the suggestion Tony.
Hi Tony,
sorry for bothering you. I have a simple question that might have a fast answer I hope. Is it possible in BriskStream to set the size of the message buffers used by the ExecutorNode to exchange tuples with each other? I imagine the API does not allow this, but maybe you can give me some hints about where I can modify this parameter in the source code.
Many thanks for your help,
Gabriele
Hi Gabriele,
Currently, BriskStream relies on ``-bt" to tune the number of tuples being transmitted at once between executors.
That is to say, the size of the message buffer is (size of tuple) x (batch)
, where the (batch)
is tuneable.
More fine-grained control of the size of the message buffer is unfortunately not allowed unless you statically define the size of each tuple. This is because of the usage of object reference queues in TStream, i.e., it does not care about the actual physical size of a tuple.
To modify this behaviour, you could try to use P1C1OffHeapQueue
(located at package brisk.queue.impl), which forces the system to use a statically defined object to pass (say an integer). Then, you can fine-grained control the size of the message buffer (say define the input to the queue as one integer as a 32 bits message buffer, two integers as 64 bits and so on).
Tony
Thanks. I am wondering if there is possibility to choose the size of the object reference queue (Tstream?). For example 100 means that it can hold 100 objects independently from their type (each entry is just a reference). This is actually my goal to set the maximum number of references that can be written in that queue. Maybe now my question could be clearer. Sorry for that.
I see, so you are talking about the size of the communication queue.
Yes, you can change it. It is located in the executorThread.java. Currently, it is hard-coded as 100000. Check for the following code.
private void allocate_OutputQueue() { // if (enable_latency_measurement) { // executor.allocate_OutputQueue(conf.getBoolean("linked", false), 2);//no queueing delay. // } else { executor.allocate_OutputQueue(conf.getBoolean("linked", false), 100000); // } }