GrokkingStreamingSystems
GrokkingStreamingSystems copied to clipboard
Question re: exercise 2 in Chapter 4
Hi! :wave: Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!
I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.
It reads as follows:
- Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below:
![]()
What's unclear is the second sentence:
Now two evaluators have the same type of calculation at beginning of their evaluation.
Looking at the three evaluators, the apply
methods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
And their instantiations in FraudDetectionJob look pretty similar, too:
// Create a stream from a source.
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
// One stream can have multiple channels. Different operator can be hooked up
// to different channels to receive different events. When no channel is selected,
// the default channel will be used.
Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
2, new UserAccountFieldsGrouping()));
Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
2, new UserAccountFieldsGrouping()));
Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
2, new UserAccountFieldsGrouping()));
They all have the same parallelism and the same event grouping.
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
Thanks again!
For the sake of wiring up a solution, I just created a dummy evaluator called WindowedPreAnalyzer
whose apply
method just passes along the event to WindowedProximityAnalyzer
and WindowedTransactionCountAnalyzer
.
All changes are in this PR, but for convenience here's the relevant snippets:
src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.java
package com.streamwork.ch04.job;
import com.streamwork.ch04.api.Event;
import com.streamwork.ch04.api.EventCollector;
import com.streamwork.ch04.api.GroupingStrategy;
import com.streamwork.ch04.api.Operator;
class WindowedPreAnalyzer extends Operator {
private static final long serialVersionUID = 302238920663638527L;
private int instance;
public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
super(name, parallelism, grouping);
}
@Override
public void setupInstance(int instance) {
this.instance = instance;
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(transaction);
}
}
It's instantiated in FraudDetectionJob
like this:
src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.java
package com.streamwork.ch04.job;
import com.streamwork.ch04.api.Job;
import com.streamwork.ch04.api.Stream;
import com.streamwork.ch04.api.Streams;
import com.streamwork.ch04.engine.JobStarter;
public class FraudDetectionJob {
public static void main(String[] args) {
Job job = new Job("fraud_detection_job");
// Create a stream from a source.
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990));
// One stream can have multiple channels. Different operator can be hooked up
// to different channels to receive different events. When no channel is selected,
// the default channel will be used.
Stream evalResults1 = transactionOut
.applyOperator(
new AvgTicketAnalyzer(
"avg ticket analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults2 = transactionOut
.applyOperator(
new WindowedPreAnalyzer(
"windowed pre analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults3a = evalResults2
.applyOperator(
new WindowedProximityAnalyzer(
"windowed proximity analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults3b = evalResults2
.applyOperator(
new WindowedTransactionCountAnalyzer(
"windowed transaction count analyzer",
2,
new UserAccountFieldsGrouping()
)
);
ScoreStorage store = new ScoreStorage();
Streams.of(evalResults1, evalResults3a, evalResults3b)
.applyOperator(
new ScoreAggregator(
"score aggregator",
2,
new GroupByTransactionId(),
store
)
);
Logger.log("This is a streaming job that detect suspicious transactions. " +
"Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " +
"Merchandises N and N + 1 are 1 seconds walking distance away from each other.");
JobStarter starter = new JobStarter(job);
starter.start();
}
}
Hi @ldnicolasmay , thank you for the question and proposed solution! I'll make sure to take some time and review it this weekend.
+1 for not only reporting an issue but also proposing a solution. 💯
In my opinion, the whole flow is like this:
graph TD
A(Source)--TransactionEvent-->B1(PreWindowOperator);
A--TransactionEvent-->B2(AvgTicketAnalyzer);
B1--PreWindowEvent-->C1(WindowedProximityAnalyzer);
B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer);
B2--TransactionScoreEvent-->D(ScoreAggregator);
C1--TransactionScoreEvent-->D;
C2--TransactionScoreEvent-->D;
Now two evaluators have the same type of calculation at beginning of their evaluation.
I think it means WindowedProximityAnalyzer
WindowedTransactionCountAnalyzer
operator accept the previous operator handled result PreWindowEvent
(each queue get the same event)
pseudo code:
public class PreWindowAnalyzer extends Operator {
private int instance;
public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
super(name, parallelism, grouping);
}
@Override
public void setupInstance(int instance) {
this.instance = instance;
}
@Override
public void apply(Event event, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent) event);
eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F)));
}
}
public class PreWindowEvent implements Event {
public TransactionScoreEvent event;
public PreWindowEvent(TransactionScoreEvent event) {
this.event = event;
}
}
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2,
new UserAccountFieldsGrouping()));
Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
2, new UserAccountFieldsGrouping()));
Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
2, new UserAccountFieldsGrouping()));
Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
2, new UserAccountFieldsGrouping()));
ScoreStorage store = new ScoreStorage();
Streams.of(out1, out2, avgOut)
.applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));
Hey, I want to apologize for my delay. I'll have time to look at this today. @bxb100 and @ldnicolasmay .
@nwangtw , if you have some time, could you review as well?
Hi! 👋 Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!
I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.
It reads as follows:
- Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below:
![]()
What's unclear is the second sentence:
Now two evaluators have the same type of calculation at beginning of their evaluation.
Looking at the three evaluators, the
apply
methods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }
@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }
@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }
And their instantiations in FraudDetectionJob look pretty similar, too:
// Create a stream from a source. Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990)); // One stream can have multiple channels. Different operator can be hooked up // to different channels to receive different events. When no channel is selected, // the default channel will be used. Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer", 2, new UserAccountFieldsGrouping())); Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer", 2, new UserAccountFieldsGrouping())); Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer", 2, new UserAccountFieldsGrouping()));
They all have the same parallelism and the same event grouping.
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
Thanks again!
When I said "I'll have to look at this today" I meant, I'll forgot to check in and follow up the next week. 😬 .
In reference to this statement
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
I think we used a not-so-great choice of words. I think what we meant was more like "two evaluators have copies of the same event to evaluate"
For the sake of wiring up a solution, I just created a dummy evaluator called
WindowedPreAnalyzer
whoseapply
method just passes along the event toWindowedProximityAnalyzer
andWindowedTransactionCountAnalyzer
.All changes are in this PR, but for convenience here's the relevant snippets:
src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.java
package com.streamwork.ch04.job; import com.streamwork.ch04.api.Event; import com.streamwork.ch04.api.EventCollector; import com.streamwork.ch04.api.GroupingStrategy; import com.streamwork.ch04.api.Operator; class WindowedPreAnalyzer extends Operator { private static final long serialVersionUID = 302238920663638527L; private int instance; public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) { super(name, parallelism, grouping); } @Override public void setupInstance(int instance) { this.instance = instance; } @Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(transaction); } }
It's instantiated in
FraudDetectionJob
like this:
src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.java
package com.streamwork.ch04.job; import com.streamwork.ch04.api.Job; import com.streamwork.ch04.api.Stream; import com.streamwork.ch04.api.Streams; import com.streamwork.ch04.engine.JobStarter; public class FraudDetectionJob { public static void main(String[] args) { Job job = new Job("fraud_detection_job"); // Create a stream from a source. Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990)); // One stream can have multiple channels. Different operator can be hooked up // to different channels to receive different events. When no channel is selected, // the default channel will be used. Stream evalResults1 = transactionOut .applyOperator( new AvgTicketAnalyzer( "avg ticket analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults2 = transactionOut .applyOperator( new WindowedPreAnalyzer( "windowed pre analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults3a = evalResults2 .applyOperator( new WindowedProximityAnalyzer( "windowed proximity analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults3b = evalResults2 .applyOperator( new WindowedTransactionCountAnalyzer( "windowed transaction count analyzer", 2, new UserAccountFieldsGrouping() ) ); ScoreStorage store = new ScoreStorage(); Streams.of(evalResults1, evalResults3a, evalResults3b) .applyOperator( new ScoreAggregator( "score aggregator", 2, new GroupByTransactionId(), store ) ); Logger.log("This is a streaming job that detect suspicious transactions. " + "Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " + "Merchandises N and N + 1 are 1 seconds walking distance away from each other."); JobStarter starter = new JobStarter(job); starter.start(); } }
This is interesting. I don't remember if we included any exercises for the other exercises, but I don't see why not. I like it. @nwangtw any chance you have some time to review this code?
In my opinion, the whole flow is like this:
graph TD A(Source)--TransactionEvent-->B1(PreWindowOperator); A--TransactionEvent-->B2(AvgTicketAnalyzer); B1--PreWindowEvent-->C1(WindowedProximityAnalyzer); B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer); B2--TransactionScoreEvent-->D(ScoreAggregator); C1--TransactionScoreEvent-->D; C2--TransactionScoreEvent-->D;
Now two evaluators have the same type of calculation at beginning of their evaluation.
I think it means
WindowedProximityAnalyzer
WindowedTransactionCountAnalyzer
operator accept the previous operator handled resultPreWindowEvent
(each queue get the same event)pseudo code:
public class PreWindowAnalyzer extends Operator { private int instance; public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) { super(name, parallelism, grouping); } @Override public void setupInstance(int instance) { this.instance = instance; } @Override public void apply(Event event, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent) event); eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F))); } }
public class PreWindowEvent implements Event { public TransactionScoreEvent event; public PreWindowEvent(TransactionScoreEvent event) { this.event = event; } }
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990)); Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2, new UserAccountFieldsGrouping())); Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer", 2, new UserAccountFieldsGrouping())); Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer", 2, new UserAccountFieldsGrouping())); Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer", 2, new UserAccountFieldsGrouping())); ScoreStorage store = new ScoreStorage(); Streams.of(out1, out2, avgOut) .applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));
Thanks for this and the diagram, @bxb100 💯 . @nwangtw , could you take a look at this code, please? What do you think?
Yeah. @bxb100 is correct.
@ldnicolasmay got the DAG correct, so technically it is also a correct solution for the problem itself. :) On the other hand, the power of having the extra layer is that the PreWindowOperator
class can run some process and prepare events in a more efficient structure for WindowedProximityAnalyzer
and WindowedTransactionCountAnalyzer
, for example, adding window related data in the PreWindowEvent
. I guess it could be WindowedEvent
in this example.
Thanks all for the interesting discussion. :)
Like @joshfischer1108 said, the wording is confusing. Maybe another way to explain the problem is to replace the now
with assuming
. Then because the two components share the calculation at beginning of their evaluation, it is reasonable to refactor the two components and move the shared logic into the new component in front of them; and then a new event structure is likely to happen.