incubator-wayang icon indicating copy to clipboard operation
incubator-wayang copied to clipboard

Feature kafka source and sink mk 002

Open kamir opened this issue 1 year ago • 10 comments

This is a draft pull request.

I wanted to test the added function readKafkaTopic(topicName: String) which is new in the JavaPlanBuilder.

Here is my test code:

import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.java.Java;

import java.util.Arrays; import java.util.Collection;

public class KafkaTopicWordCount {

public static void main(String[] args){

    
    System.out.println( ">>> Apache Wayang Test #01");
    System.out.println( "    We use a Kafka topic and a 'Java Context'.");
    int i = 0;
    for (String arg : args) {
        String line = String.format( "  %d    - %s", i,arg);
        System.out.println(line);
        i=i+1;
    }

    // Settings
    String topicName = args[1];

    // Get a plan builder.
    WayangContext wayangContext = new WayangContext(new Configuration())
            .withPlugin(Java.basicPlugin());
    //        .withPlugin(Spark.basicPlugin());
    JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
            .withJobName(String.format("WordCount (%s)", topicName))
            .withUdfJarOf(KafkaTopicWordCount.class);

    // Start building the WayangPlan.
    Collection<Tuple2<String, Integer>> wordcounts = planBuilder
            // Read the text file.
            .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and collect the results.
            .collect();

    System.out.println(wordcounts);
    System.out.println( "### Done. ###" );
     
}

}

kamir avatar Feb 05 '24 19:02 kamir

The visibility issue has been solved. My end-2-end test as shown in the comment above can be started. It can find the function now, but it fails still with an error, due to a missing WayangPlan.

[INFO] --- exec:3.0.0:java (default-cli) @ wayang-test-01 ---

Apache Wayang Test #01 We use a Kafka topic and a 'Java Context'. *** Use default topic name: banking-tx-small-csv 16:39:45.821 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - No comprehensive PlanEnumeration. 16:39:45.829 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - Pending enumerations: [] 16:39:45.829 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - Pending concatenations: [] [WARNING] org.apache.wayang.core.api.exception.WayangException: Could not find a single execution plan. at org.apache.wayang.core.optimizer.enumeration.PlanEnumerator.enumerate (PlanEnumerator.java:305) at org.apache.wayang.core.api.Job.createInitialExecutionPlan (Job.java:417) at org.apache.wayang.core.api.Job.doExecute (Job.java:290) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:448) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:75) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829) [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.777 s [INFO] Finished at: 2024-02-06T16:39:45+01:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project wayang-test-01: An exception occured while executing the Java class. Could not find a single execution plan. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR]

kamir avatar Feb 06 '24 15:02 kamir

I think you missed the mapping from the KafkaSourceOperator which is a platform-agnostic Wayang operator to its Java-specific implementation. See, for example, here for the FlatMap Operator: https://github.com/apache/incubator-wayang/blob/main/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/FlatMapMapping.java

Then this mapping needs to be registered in the Java platform. https://github.com/apache/incubator-wayang/blob/main/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java

Hope with these it works.

zkaoudi avatar Feb 06 '24 20:02 zkaoudi

Yes, very good. Thanks for that pointer. I added the mapping, updated my credentials, and voila ....

We get a new error related to the "Load Estimation procedure":

[INFO] --- exec:3.0.0:java (default-cli) @ wayang-test-01 ---

Apache Wayang Test #01 We use a Kafka topic and a 'Java Context'. *** Use default topic name: banking-tx-small-csv

7 ...

Create consumer from DEFAULT PROPERTIES. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/mkaempf/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/mkaempf/.m2/repository/org/slf4j/slf4j-simple/1.7.13/slf4j-simple-1.7.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. KafkaTopicSource isInitialized=true

8 ...

[WARNING] org.apache.wayang.core.api.exception.WayangException: Job execution failed. at org.apache.wayang.core.api.Job.doExecute (Job.java:330) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:463) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:78) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.wayang.core.platform.AtomicExecution.estimateLoad (AtomicExecution.java:59) at org.apache.wayang.core.platform.AtomicExecutionGroup.lambda$estimateLoad$0 (AtomicExecutionGroup.java:90) at java.util.stream.ReferencePipeline$3$1.accept (ReferencePipeline.java:195) at java.util.LinkedList$LLSpliterator.forEachRemaining (LinkedList.java:1239) at java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:474) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential (ReduceOps.java:913) at java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.reduce (ReferencePipeline.java:558) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateLoad (AtomicExecutionGroup.java:91) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateExecutionTime (AtomicExecutionGroup.java:108) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateExecutionTime (AtomicExecutionGroup.java:117) at org.apache.wayang.core.platform.PartialExecution.lambda$getOverallTimeEstimate$3 (PartialExecution.java:173) at java.util.stream.ReferencePipeline$3$1.accept (ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining (ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:474) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential (ReduceOps.java:913) at java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.reduce (ReferencePipeline.java:553) at org.apache.wayang.core.platform.PartialExecution.getOverallTimeEstimate (PartialExecution.java:174) at org.apache.wayang.core.profiling.PartialExecutionMeasurement. (PartialExecutionMeasurement.java:62) at org.apache.wayang.core.api.Job.logExecution (Job.java:705) at org.apache.wayang.core.api.Job.doExecute (Job.java:325) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:463) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:78) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829)

kamir avatar Feb 08 '24 15:02 kamir

Hmmm that's interesting but at least we move forward :)

I think because you added the following without adding these keys in the properties files, it throws the execption: @Override public Collection<String> getLoadProfileEstimatorConfigurationKeys() { return Arrays.asList("wayang.java.kafkatopicsource.load.prepare", "wayang.java.kafkatopicsource.load.main"); } Maybe try to add a cost function in the java properties file for the kafkatopisource: https://github.com/apache/incubator-wayang/blob/cd4936e665c2978394943117b44853f7ebabbec8/wayang-platforms/wayang-java/src/main/resources/wayang-java-defaults.properties You can copy the one of the collectionsource for now.

To also check it quickly you could also try to not overload the getLoadProfileEstimatorConfigurationKeys() method.

zkaoudi avatar Feb 09 '24 08:02 zkaoudi

By not overwriting getLoadProfileEstimatorConfigurationKeys() it works. And with the right definitions in the wayang-java-defaults.properties file ist works as well.

KafkaWordCount Example

KafkaTopicSource isInitialized=true [(address, 9), (city, 9), (orderid, 9), (18, 9), (94041, 9), (ordertime, 9), (zipcode, 9), (itemid, 9), (view, 9), (mountain, 9), (item_184, 9), (1497014222380, 9), (state, 9), (ca, 9)] Done.

kamir avatar Feb 09 '24 16:02 kamir

That's awesome! The best way is to provide the keys and add the cost function in the properties file.

Now there are some conflicts in some files.. I will check them out and see if I can resolve them in the weekend.

zkaoudi avatar Feb 09 '24 18:02 zkaoudi

KafkaSource and KafkaSink are ready.

I will add tests later this week, or maybe next weekend.

But my external KafkaWordCountDemo works.

import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.function.FunctionDescriptor; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; import org.apache.wayang.java.Java;

import java.util.Arrays; import java.util.Collection;

public class KafkaTopicWordCount {

// Define the lambda function for formatting the output
private static final FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = tuple -> {
    return tuple.getField0() + ": " + tuple.getField1();
};

public static void main(String[] args){

    System.out.println( ">>> Apache Wayang Test #01");
    System.out.println( "    We use a Kafka topic and a 'Java Context'.");

    // Default topic name
    String topicName = "banking-tx-small-csv";

    // Check if at least one argument is provided
    if (args.length > 0) {
        // Assuming the first argument is the topic name
        topicName = args[0];

        int i = 0;
        for (String arg : args) {
            String line = String.format( "  %d    - %s", i,arg);
            System.out.println(line);
            i=i+1;
        }

    }
    else {
        System.out.println( "*** Use default topic name: " + topicName );
    }

    Configuration configuration = new Configuration();
    // Get a plan builder.
    WayangContext wayangContext = new WayangContext(configuration)
            .withPlugin(Java.basicPlugin());
    //        .withPlugin(Spark.basicPlugin());
    JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
            .withJobName(String.format("WordCount (%s)", topicName))
            .withUdfJarOf(KafkaTopicWordCount.class);

/* // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts_collection = planBuilder // Read the text file. .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and collect the results.
            .collect();

    System.out.println( wordcounts_collection );
    System.out.println( "### Done. ###" );
 */

    // Start building the WayangPlan.
    planBuilder
            // Read the text file.
            .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and store the results in Kafka.
            //.writeKafkaTopic("file:///Users/mkaempf/GITHUB.private/open-sustainability-data/bin/test_23456.txt", d -> String.format("%.2f, %d", d.getField1(), d.getField0()), "job_test_1",
            .writeKafkaTopic("test_23456", d -> String.format("%d, %s", d.getField1(), d.getField0()), "job_test_1",
                    LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration) );


}

}

kamir avatar Feb 12 '24 20:02 kamir

For testing the SparkKafka integration I used this test class:

import org.apache.wayang.api.DataQuantaBuilder;
import org.apache.wayang.api.FilterDataQuantaBuilder;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.api.ReduceByDataQuantaBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;

public class KafkaTopicWordCountSpark implements Serializable {

    public KafkaTopicWordCountSpark() {}

    // Define the lambda function for formatting the output
    private static final FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = tuple -> {
        return tuple.getField0() + ": " + tuple.getField1();
    };

    public static void main(String[] args){

        System.out.println( ">>> Apache Wayang Test #02");
        System.out.println( "    We use a Kafka topic and an 'Apache Spark Context'.");

        // Default topic name
        String topicName = "banking-tx-small-csv";

        // Check if at least one argument is provided
        if (args.length > 0) {
            // Assuming the first argument is the topic name
            topicName = args[0];

            int i = 0;
            for (String arg : args) {
                String line = String.format( "  %d    - %s", i,arg);
                System.out.println(line);
                i=i+1;
            }
        }
        else {
            System.out.println( "*** Use default topic name: " + topicName );
        }

        Configuration configuration = new Configuration();
        // Get a plan builder.
        WayangContext wayangContext = new WayangContext(configuration)
                .withPlugin(Spark.basicPlugin());

        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
                .withJobName(String.format("WordCount via Spark on Kafka topic (%s)", topicName))
                .withUdfJarOf(KafkaTopicWordCountSpark.class);

        // Start building the WayangPlan.
        //Collection<Tuple2<String, Integer>> wordcounts_collection =

        planBuilder
                // Read the records from a Kafka topic.
                .readKafkaTopic(topicName).withName("Load-data-from-topic")

                // Split each line by non-word characters.
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .withSelectivity(10, 100, 0.9)
                .withName("Split-words")

                // Filter empty tokens.
                .filter(token -> !token.isEmpty())
                .withSelectivity(0.99, 0.99, 0.99)
                .withName("Filter empty words")
                // Attach counter to each word.
                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To-lower-case, add-counter")

                // Sum up counters for every word.
                .reduceByKey(
                        Tuple2::getField0,
                        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
                )
                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
                .withName("Add counters")
                // .collect();

                // Execute the plan and store the results in Kafka.
                //.writeKafkaTopic("file:///Users/mkaempf/GITHUB.private/open-sustainability-data/bin/test_23456789.txt", d -> String.format("%.2f, %d", d.getField1(), d.getField0()), "job_test_1",
                .writeKafkaTopic("test_23456", d -> Util.formatData( d.getField0(), d.getField1() ), "job_test_2",
                        LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration) );
 
    }

With this utility class:

import java.io.Serializable;

public class Util implements Serializable {
    public static String formatData( String f1, Integer f2 ) {
        return String.format("%d, %s", f1, f2);
    }
}

Currently I still run into NotSerializableExceptions.

More details have been shared on the dev mailinglist.

I push the code so that others can see where I made changes so far, and maybe, someone has an idea where to remove the ugly blocker.

kamir avatar Mar 10 '24 11:03 kamir

@zkaoudi @kamir - can you please review?

2pk03 avatar Apr 10 '24 12:04 2pk03

@2pk03 for some reason there are conflicts which must be resolved before merging.

zkaoudi avatar Apr 11 '24 12:04 zkaoudi