GrokkingStreamingSystems
GrokkingStreamingSystems copied to clipboard
Question re: exercise 2 in Chapter 4
Hi! :wave: Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!
I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.
It reads as follows:
- Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below:
What's unclear is the second sentence:
Now two evaluators have the same type of calculation at beginning of their evaluation.
Looking at the three evaluators, the apply methods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(new TransactionScoreEvent(e, 0.0f));
}
And their instantiations in FraudDetectionJob look pretty similar, too:
// Create a stream from a source.
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
// One stream can have multiple channels. Different operator can be hooked up
// to different channels to receive different events. When no channel is selected,
// the default channel will be used.
Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
2, new UserAccountFieldsGrouping()));
Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
2, new UserAccountFieldsGrouping()));
Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
2, new UserAccountFieldsGrouping()));
They all have the same parallelism and the same event grouping.
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
Thanks again!
For the sake of wiring up a solution, I just created a dummy evaluator called WindowedPreAnalyzer whose apply method just passes along the event to WindowedProximityAnalyzer and WindowedTransactionCountAnalyzer.
All changes are in this PR, but for convenience here's the relevant snippets:
src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.java
package com.streamwork.ch04.job;
import com.streamwork.ch04.api.Event;
import com.streamwork.ch04.api.EventCollector;
import com.streamwork.ch04.api.GroupingStrategy;
import com.streamwork.ch04.api.Operator;
class WindowedPreAnalyzer extends Operator {
private static final long serialVersionUID = 302238920663638527L;
private int instance;
public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
super(name, parallelism, grouping);
}
@Override
public void setupInstance(int instance) {
this.instance = instance;
}
@Override
public void apply(Event transaction, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent)transaction);
// Dummy analyzer. Allow all transactions.
eventCollector.add(transaction);
}
}
It's instantiated in FraudDetectionJob like this:
src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.java
package com.streamwork.ch04.job;
import com.streamwork.ch04.api.Job;
import com.streamwork.ch04.api.Stream;
import com.streamwork.ch04.api.Streams;
import com.streamwork.ch04.engine.JobStarter;
public class FraudDetectionJob {
public static void main(String[] args) {
Job job = new Job("fraud_detection_job");
// Create a stream from a source.
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990));
// One stream can have multiple channels. Different operator can be hooked up
// to different channels to receive different events. When no channel is selected,
// the default channel will be used.
Stream evalResults1 = transactionOut
.applyOperator(
new AvgTicketAnalyzer(
"avg ticket analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults2 = transactionOut
.applyOperator(
new WindowedPreAnalyzer(
"windowed pre analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults3a = evalResults2
.applyOperator(
new WindowedProximityAnalyzer(
"windowed proximity analyzer",
2,
new UserAccountFieldsGrouping()
)
);
Stream evalResults3b = evalResults2
.applyOperator(
new WindowedTransactionCountAnalyzer(
"windowed transaction count analyzer",
2,
new UserAccountFieldsGrouping()
)
);
ScoreStorage store = new ScoreStorage();
Streams.of(evalResults1, evalResults3a, evalResults3b)
.applyOperator(
new ScoreAggregator(
"score aggregator",
2,
new GroupByTransactionId(),
store
)
);
Logger.log("This is a streaming job that detect suspicious transactions. " +
"Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " +
"Merchandises N and N + 1 are 1 seconds walking distance away from each other.");
JobStarter starter = new JobStarter(job);
starter.start();
}
}
Hi @ldnicolasmay , thank you for the question and proposed solution! I'll make sure to take some time and review it this weekend.
+1 for not only reporting an issue but also proposing a solution. 💯
In my opinion, the whole flow is like this:
graph TD
A(Source)--TransactionEvent-->B1(PreWindowOperator);
A--TransactionEvent-->B2(AvgTicketAnalyzer);
B1--PreWindowEvent-->C1(WindowedProximityAnalyzer);
B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer);
B2--TransactionScoreEvent-->D(ScoreAggregator);
C1--TransactionScoreEvent-->D;
C2--TransactionScoreEvent-->D;
Now two evaluators have the same type of calculation at beginning of their evaluation.
I think it means WindowedProximityAnalyzer WindowedTransactionCountAnalyzer operator accept the previous operator handled result PreWindowEvent (each queue get the same event)
pseudo code:
public class PreWindowAnalyzer extends Operator {
private int instance;
public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
super(name, parallelism, grouping);
}
@Override
public void setupInstance(int instance) {
this.instance = instance;
}
@Override
public void apply(Event event, EventCollector eventCollector) {
TransactionEvent e = ((TransactionEvent) event);
eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F)));
}
}
public class PreWindowEvent implements Event {
public TransactionScoreEvent event;
public PreWindowEvent(TransactionScoreEvent event) {
this.event = event;
}
}
Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2,
new UserAccountFieldsGrouping()));
Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
2, new UserAccountFieldsGrouping()));
Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
2, new UserAccountFieldsGrouping()));
Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
2, new UserAccountFieldsGrouping()));
ScoreStorage store = new ScoreStorage();
Streams.of(out1, out2, avgOut)
.applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));
Hey, I want to apologize for my delay. I'll have time to look at this today. @bxb100 and @ldnicolasmay .
@nwangtw , if you have some time, could you review as well?
Hi! 👋 Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!
I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.
It reads as follows:
- Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below:
What's unclear is the second sentence:
Now two evaluators have the same type of calculation at beginning of their evaluation.
Looking at the three evaluators, the
applymethods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }@Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(new TransactionScoreEvent(e, 0.0f)); }And their instantiations in FraudDetectionJob look pretty similar, too:
// Create a stream from a source. Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990)); // One stream can have multiple channels. Different operator can be hooked up // to different channels to receive different events. When no channel is selected, // the default channel will be used. Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer", 2, new UserAccountFieldsGrouping())); Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer", 2, new UserAccountFieldsGrouping())); Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer", 2, new UserAccountFieldsGrouping()));They all have the same parallelism and the same event grouping.
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
Thanks again!
When I said "I'll have to look at this today" I meant, I'll forgot to check in and follow up the next week. 😬 .
In reference to this statement
My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?
I think we used a not-so-great choice of words. I think what we meant was more like "two evaluators have copies of the same event to evaluate"
For the sake of wiring up a solution, I just created a dummy evaluator called
WindowedPreAnalyzerwhoseapplymethod just passes along the event toWindowedProximityAnalyzerandWindowedTransactionCountAnalyzer.All changes are in this PR, but for convenience here's the relevant snippets:
src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.javapackage com.streamwork.ch04.job; import com.streamwork.ch04.api.Event; import com.streamwork.ch04.api.EventCollector; import com.streamwork.ch04.api.GroupingStrategy; import com.streamwork.ch04.api.Operator; class WindowedPreAnalyzer extends Operator { private static final long serialVersionUID = 302238920663638527L; private int instance; public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) { super(name, parallelism, grouping); } @Override public void setupInstance(int instance) { this.instance = instance; } @Override public void apply(Event transaction, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent)transaction); // Dummy analyzer. Allow all transactions. eventCollector.add(transaction); } }It's instantiated in
FraudDetectionJoblike this:
src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.javapackage com.streamwork.ch04.job; import com.streamwork.ch04.api.Job; import com.streamwork.ch04.api.Stream; import com.streamwork.ch04.api.Streams; import com.streamwork.ch04.engine.JobStarter; public class FraudDetectionJob { public static void main(String[] args) { Job job = new Job("fraud_detection_job"); // Create a stream from a source. Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990)); // One stream can have multiple channels. Different operator can be hooked up // to different channels to receive different events. When no channel is selected, // the default channel will be used. Stream evalResults1 = transactionOut .applyOperator( new AvgTicketAnalyzer( "avg ticket analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults2 = transactionOut .applyOperator( new WindowedPreAnalyzer( "windowed pre analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults3a = evalResults2 .applyOperator( new WindowedProximityAnalyzer( "windowed proximity analyzer", 2, new UserAccountFieldsGrouping() ) ); Stream evalResults3b = evalResults2 .applyOperator( new WindowedTransactionCountAnalyzer( "windowed transaction count analyzer", 2, new UserAccountFieldsGrouping() ) ); ScoreStorage store = new ScoreStorage(); Streams.of(evalResults1, evalResults3a, evalResults3b) .applyOperator( new ScoreAggregator( "score aggregator", 2, new GroupByTransactionId(), store ) ); Logger.log("This is a streaming job that detect suspicious transactions. " + "Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " + "Merchandises N and N + 1 are 1 seconds walking distance away from each other."); JobStarter starter = new JobStarter(job); starter.start(); } }
This is interesting. I don't remember if we included any exercises for the other exercises, but I don't see why not. I like it. @nwangtw any chance you have some time to review this code?
In my opinion, the whole flow is like this:
graph TD A(Source)--TransactionEvent-->B1(PreWindowOperator); A--TransactionEvent-->B2(AvgTicketAnalyzer); B1--PreWindowEvent-->C1(WindowedProximityAnalyzer); B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer); B2--TransactionScoreEvent-->D(ScoreAggregator); C1--TransactionScoreEvent-->D; C2--TransactionScoreEvent-->D;Now two evaluators have the same type of calculation at beginning of their evaluation.
I think it means
WindowedProximityAnalyzerWindowedTransactionCountAnalyzeroperator accept the previous operator handled resultPreWindowEvent(each queue get the same event)pseudo code:
public class PreWindowAnalyzer extends Operator { private int instance; public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) { super(name, parallelism, grouping); } @Override public void setupInstance(int instance) { this.instance = instance; } @Override public void apply(Event event, EventCollector eventCollector) { TransactionEvent e = ((TransactionEvent) event); eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F))); } }public class PreWindowEvent implements Event { public TransactionScoreEvent event; public PreWindowEvent(TransactionScoreEvent event) { this.event = event; } }Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990)); Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2, new UserAccountFieldsGrouping())); Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer", 2, new UserAccountFieldsGrouping())); Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer", 2, new UserAccountFieldsGrouping())); Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer", 2, new UserAccountFieldsGrouping())); ScoreStorage store = new ScoreStorage(); Streams.of(out1, out2, avgOut) .applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));
Thanks for this and the diagram, @bxb100 💯 . @nwangtw , could you take a look at this code, please? What do you think?
Yeah. @bxb100 is correct.
@ldnicolasmay got the DAG correct, so technically it is also a correct solution for the problem itself. :) On the other hand, the power of having the extra layer is that the PreWindowOperator class can run some process and prepare events in a more efficient structure for WindowedProximityAnalyzer and WindowedTransactionCountAnalyzer, for example, adding window related data in the PreWindowEvent. I guess it could be WindowedEvent in this example.
Thanks all for the interesting discussion. :)
Like @joshfischer1108 said, the wording is confusing. Maybe another way to explain the problem is to replace the now with assuming. Then because the two components share the calculation at beginning of their evaluation, it is reasonable to refactor the two components and move the shared logic into the new component in front of them; and then a new event structure is likely to happen.
