GrokkingStreamingSystems icon indicating copy to clipboard operation
GrokkingStreamingSystems copied to clipboard

Question re: exercise 2 in Chapter 4

Open ldnicolasmay opened this issue 1 year ago • 8 comments

Hi! :wave: Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!

I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.

It reads as follows:

  1. Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below: Grokking_Streaming_Systems_Real_time_event_processing_ch4_exer2

What's unclear is the second sentence:

Now two evaluators have the same type of calculation at beginning of their evaluation.

Looking at the three evaluators, the apply methods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:

  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }
  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }
  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }

And their instantiations in FraudDetectionJob look pretty similar, too:

    // Create a stream from a source.
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
    // One stream can have multiple channels. Different operator can be hooked up
    // to different channels to receive different events. When no channel is selected,
    // the default channel will be used.
    Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
        2, new UserAccountFieldsGrouping()));
    Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
        2, new UserAccountFieldsGrouping()));
    Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
        2, new UserAccountFieldsGrouping()));

They all have the same parallelism and the same event grouping.

My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?

Thanks again!

ldnicolasmay avatar Sep 14 '23 00:09 ldnicolasmay

For the sake of wiring up a solution, I just created a dummy evaluator called WindowedPreAnalyzer whose apply method just passes along the event to WindowedProximityAnalyzer and WindowedTransactionCountAnalyzer.

All changes are in this PR, but for convenience here's the relevant snippets:

src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.java

package com.streamwork.ch04.job;

import com.streamwork.ch04.api.Event;
import com.streamwork.ch04.api.EventCollector;
import com.streamwork.ch04.api.GroupingStrategy;
import com.streamwork.ch04.api.Operator;

class WindowedPreAnalyzer extends Operator {
  private static final long serialVersionUID = 302238920663638527L;
  private int instance;

  public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
    super(name, parallelism, grouping);
  }

  @Override
  public void setupInstance(int instance) {
    this.instance = instance;
  }

  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(transaction);
  }
}

It's instantiated in FraudDetectionJob like this:

src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.java

package com.streamwork.ch04.job;

import com.streamwork.ch04.api.Job;
import com.streamwork.ch04.api.Stream;
import com.streamwork.ch04.api.Streams;
import com.streamwork.ch04.engine.JobStarter;

public class FraudDetectionJob {

  public static void main(String[] args) {
    Job job = new Job("fraud_detection_job");

    // Create a stream from a source.
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990));
    // One stream can have multiple channels. Different operator can be hooked up
    // to different channels to receive different events. When no channel is selected,
    // the default channel will be used.
    Stream evalResults1 = transactionOut
        .applyOperator(
            new AvgTicketAnalyzer(
                "avg ticket analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults2 = transactionOut
        .applyOperator(
            new WindowedPreAnalyzer(
                "windowed pre analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults3a = evalResults2
        .applyOperator(
            new WindowedProximityAnalyzer(
                "windowed proximity analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults3b = evalResults2
        .applyOperator(
            new WindowedTransactionCountAnalyzer(
                "windowed transaction count analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );


    ScoreStorage store = new ScoreStorage();
    Streams.of(evalResults1, evalResults3a, evalResults3b)
            .applyOperator(
                new ScoreAggregator(
                    "score aggregator",
                    2,
                    new GroupByTransactionId(),
                    store
                )
            );

    Logger.log("This is a streaming job that detect suspicious transactions. " +
        "Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " +
        "Merchandises N and N + 1 are 1 seconds walking distance away from each other.");
    JobStarter starter = new JobStarter(job);
    starter.start();
  }
}

ldnicolasmay avatar Sep 14 '23 01:09 ldnicolasmay

Hi @ldnicolasmay , thank you for the question and proposed solution! I'll make sure to take some time and review it this weekend.

+1 for not only reporting an issue but also proposing a solution. 💯

joshfischer1108 avatar Sep 15 '23 14:09 joshfischer1108

In my opinion, the whole flow is like this:

graph TD
    A(Source)--TransactionEvent-->B1(PreWindowOperator);
    A--TransactionEvent-->B2(AvgTicketAnalyzer);

    B1--PreWindowEvent-->C1(WindowedProximityAnalyzer);
    B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer);

    B2--TransactionScoreEvent-->D(ScoreAggregator);
    C1--TransactionScoreEvent-->D;
    C2--TransactionScoreEvent-->D;

Now two evaluators have the same type of calculation at beginning of their evaluation.

I think it means WindowedProximityAnalyzer WindowedTransactionCountAnalyzer operator accept the previous operator handled result PreWindowEvent (each queue get the same event)


pseudo code:

public class PreWindowAnalyzer extends Operator {

  private int instance;

  public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
    super(name, parallelism, grouping);
  }

  @Override
  public void setupInstance(int instance) {
    this.instance = instance;
  }

  @Override
  public void apply(Event event, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent) event);
    eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F)));
  }
}
public class PreWindowEvent implements Event {

  public TransactionScoreEvent event;

  public PreWindowEvent(TransactionScoreEvent event) {
    this.event = event;
  }
}
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));

    Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2,
      new UserAccountFieldsGrouping()));
    Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
      2, new UserAccountFieldsGrouping()));


    Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
      2, new UserAccountFieldsGrouping()));
    Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
      2, new UserAccountFieldsGrouping()));


    ScoreStorage store = new ScoreStorage();
    Streams.of(out1, out2, avgOut)
      .applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));

bxb100 avatar Sep 16 '23 09:09 bxb100

Hey, I want to apologize for my delay. I'll have time to look at this today. @bxb100 and @ldnicolasmay .

@nwangtw , if you have some time, could you review as well?

joshfischer1108 avatar Sep 28 '23 11:09 joshfischer1108

Hi! 👋 Thanks for putting this awesome book together. I'm getting a lot out of it, and I enjoy your approach to communicating this material. Heckuva job!

I've got a question about a Ch. 4 exercise (print book p. 108), specifically exercise 2.

It reads as follows:

  1. Currently, each evaluator takes a transaction event from the transaction source component and creates a score. Now two evaluators have the same type of calculation at beginning of their evaluation. Could you change the job for this case? The result will look like the graph below: Grokking_Streaming_Systems_Real_time_event_processing_ch4_exer2

What's unclear is the second sentence:

Now two evaluators have the same type of calculation at beginning of their evaluation.

Looking at the three evaluators, the apply methods in AvgTicketAnalyzer, WindowedProximityAnalyzer, and WindowedTransactionCountAnalyzer all look pretty similar:

  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }
  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }
  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(new TransactionScoreEvent(e, 0.0f));
  }

And their instantiations in FraudDetectionJob look pretty similar, too:

    // Create a stream from a source.
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));
    // One stream can have multiple channels. Different operator can be hooked up
    // to different channels to receive different events. When no channel is selected,
    // the default channel will be used.
    Stream evalResults1 = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
        2, new UserAccountFieldsGrouping()));
    Stream evalResults2 = transactionOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
        2, new UserAccountFieldsGrouping()));
    Stream evalResults3 = transactionOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
        2, new UserAccountFieldsGrouping()));

They all have the same parallelism and the same event grouping.

My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?

Thanks again!

When I said "I'll have to look at this today" I meant, I'll forgot to check in and follow up the next week. 😬 .

In reference to this statement

My question is this: How do "two evaluators have the same type of calculation at beginning of their evaluation"? What am I missing?

I think we used a not-so-great choice of words. I think what we meant was more like "two evaluators have copies of the same event to evaluate"

joshfischer1108 avatar Oct 04 '23 00:10 joshfischer1108

For the sake of wiring up a solution, I just created a dummy evaluator called WindowedPreAnalyzer whose apply method just passes along the event to WindowedProximityAnalyzer and WindowedTransactionCountAnalyzer.

All changes are in this PR, but for convenience here's the relevant snippets:

src/mian/java/com/streamwork/ch04/job/WindowedPreAnalyzer.java

package com.streamwork.ch04.job;

import com.streamwork.ch04.api.Event;
import com.streamwork.ch04.api.EventCollector;
import com.streamwork.ch04.api.GroupingStrategy;
import com.streamwork.ch04.api.Operator;

class WindowedPreAnalyzer extends Operator {
  private static final long serialVersionUID = 302238920663638527L;
  private int instance;

  public WindowedPreAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
    super(name, parallelism, grouping);
  }

  @Override
  public void setupInstance(int instance) {
    this.instance = instance;
  }

  @Override
  public void apply(Event transaction, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent)transaction);
    // Dummy analyzer. Allow all transactions.
    eventCollector.add(transaction);
  }
}

It's instantiated in FraudDetectionJob like this:

src/mian/java/com/streamwork/ch04/job/FraudDetectionJob.java

package com.streamwork.ch04.job;

import com.streamwork.ch04.api.Job;
import com.streamwork.ch04.api.Stream;
import com.streamwork.ch04.api.Streams;
import com.streamwork.ch04.engine.JobStarter;

public class FraudDetectionJob {

  public static void main(String[] args) {
    Job job = new Job("fraud_detection_job");

    // Create a stream from a source.
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 2, 9990));
    // One stream can have multiple channels. Different operator can be hooked up
    // to different channels to receive different events. When no channel is selected,
    // the default channel will be used.
    Stream evalResults1 = transactionOut
        .applyOperator(
            new AvgTicketAnalyzer(
                "avg ticket analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults2 = transactionOut
        .applyOperator(
            new WindowedPreAnalyzer(
                "windowed pre analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults3a = evalResults2
        .applyOperator(
            new WindowedProximityAnalyzer(
                "windowed proximity analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );
    Stream evalResults3b = evalResults2
        .applyOperator(
            new WindowedTransactionCountAnalyzer(
                "windowed transaction count analyzer",
                2,
                new UserAccountFieldsGrouping()
            )
        );


    ScoreStorage store = new ScoreStorage();
    Streams.of(evalResults1, evalResults3a, evalResults3b)
            .applyOperator(
                new ScoreAggregator(
                    "score aggregator",
                    2,
                    new GroupByTransactionId(),
                    store
                )
            );

    Logger.log("This is a streaming job that detect suspicious transactions. " +
        "Input needs to be in this format: {amount},{merchandiseId}. For example: 42.00,3. " +
        "Merchandises N and N + 1 are 1 seconds walking distance away from each other.");
    JobStarter starter = new JobStarter(job);
    starter.start();
  }
}

This is interesting. I don't remember if we included any exercises for the other exercises, but I don't see why not. I like it. @nwangtw any chance you have some time to review this code?

joshfischer1108 avatar Oct 04 '23 00:10 joshfischer1108

In my opinion, the whole flow is like this:

graph TD
    A(Source)--TransactionEvent-->B1(PreWindowOperator);
    A--TransactionEvent-->B2(AvgTicketAnalyzer);

    B1--PreWindowEvent-->C1(WindowedProximityAnalyzer);
    B1--PreWindowEvent-->C2(WindowedTransactionCountAnalyzer);

    B2--TransactionScoreEvent-->D(ScoreAggregator);
    C1--TransactionScoreEvent-->D;
    C2--TransactionScoreEvent-->D;

Now two evaluators have the same type of calculation at beginning of their evaluation.

I think it means WindowedProximityAnalyzer WindowedTransactionCountAnalyzer operator accept the previous operator handled result PreWindowEvent (each queue get the same event)

pseudo code:

public class PreWindowAnalyzer extends Operator {

  private int instance;

  public PreWindowAnalyzer(String name, int parallelism, GroupingStrategy grouping) {
    super(name, parallelism, grouping);
  }

  @Override
  public void setupInstance(int instance) {
    this.instance = instance;
  }

  @Override
  public void apply(Event event, EventCollector eventCollector) {
    TransactionEvent e = ((TransactionEvent) event);
    eventCollector.add(new PreWindowEvent(new TransactionScoreEvent(e, 1F)));
  }
}
public class PreWindowEvent implements Event {

  public TransactionScoreEvent event;

  public PreWindowEvent(TransactionScoreEvent event) {
    this.event = event;
  }
}
    Stream transactionOut = job.addSource(new TransactionSource("transaction source", 1, 9990));

    Stream preOut = transactionOut.applyOperator(new PreWindowAnalyzer("pre window analyzer", 2,
      new UserAccountFieldsGrouping()));
    Stream avgOut = transactionOut.applyOperator(new AvgTicketAnalyzer("avg ticket analyzer",
      2, new UserAccountFieldsGrouping()));


    Stream out1 = preOut.applyOperator(new WindowedProximityAnalyzer("windowed proximity analyzer",
      2, new UserAccountFieldsGrouping()));
    Stream out2 = preOut.applyOperator(new WindowedTransactionCountAnalyzer("windowed transaction count analyzer",
      2, new UserAccountFieldsGrouping()));


    ScoreStorage store = new ScoreStorage();
    Streams.of(out1, out2, avgOut)
      .applyOperator(new ScoreAggregator("score aggregator", 2, new GroupByTransactionId(), store));

Thanks for this and the diagram, @bxb100 💯 . @nwangtw , could you take a look at this code, please? What do you think?

joshfischer1108 avatar Oct 04 '23 00:10 joshfischer1108

Yeah. @bxb100 is correct.

@ldnicolasmay got the DAG correct, so technically it is also a correct solution for the problem itself. :) On the other hand, the power of having the extra layer is that the PreWindowOperator class can run some process and prepare events in a more efficient structure for WindowedProximityAnalyzer and WindowedTransactionCountAnalyzer, for example, adding window related data in the PreWindowEvent. I guess it could be WindowedEvent in this example.

Thanks all for the interesting discussion. :)

Like @joshfischer1108 said, the wording is confusing. Maybe another way to explain the problem is to replace the now with assuming. Then because the two components share the calculation at beginning of their evaluation, it is reasonable to refactor the two components and move the shared logic into the new component in front of them; and then a new event structure is likely to happen.

nwangtw avatar Oct 04 '23 08:10 nwangtw