streaminglens icon indicating copy to clipboard operation
streaminglens copied to clipboard

Not able to run streaminglens in intellij idea

Open mjose007 opened this issue 4 years ago • 3 comments

I am running my code locally in intellij idea with sreaming lens maven dependancy . I am getting below error . No output , let me know what i am doing wrong here


package com.manu.sstreaming;

import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;


/**
 * @author Manu Jose
 * create on : 16/04/20
 */
public class SStreamingNC {
    public static void main(String[] args) throws Exception {


        String host = "localhost";
        int port = 9999;
        //int port = Integer.parseInt(args[0]);


        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local")
                .getOrCreate();


        Map<String, String> options = new HashMap<>();
        options.put("streamingLens.reporter.intervalMinutes", "1");

        scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
                Predef.conforms());
        StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
        streamingLens.registerListeners();




        // Create DataFrame representing the stream of input lines from connection to host:port
        spark.sql("SET spark.sql.streaming.metricsEnabled=true");
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        // Split the lines into words
        Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();


        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .queryName("Query_name")
                .trigger(Trigger.ProcessingTime(2 * 1000))
                .start();

       spark.streams().awaitAnyTermination();

    }

}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145

mjose007 avatar Apr 30 '20 19:04 mjose007

Any updates

mjose007 avatar May 03 '20 12:05 mjose007

@mjose007

Sorry for the delayed response.

Are you able to see any streaming lens related logs (even errors or failures) in your spark driver logs?

Also, you don't need need to registerListeners explicitly, this line is not needed streamingLens.registerListeners();

only this should be sufficient StreamingLens streamingLens = new StreamingLens(spark, scalaMap);

My guess is it would be throwing an exception since you're trying two register the listeners twice based on this code


    try {
      sparkSession.sparkContext.addSparkListener(streamingAppListener)
      logDebug("Successfully registered Spark Listener")
    } catch {
      case e: Exception =>
        throw new SparkException("Error in registering Spark Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }
    try {
      sparkSession.streams.addListener(queryProgressListener)
      logDebug("Successfully registered StreamingQuery Listener")
    } catch {
      case e: Exception =>
        sparkSession.sparkContext.removeSparkListener(streamingAppListener)
        throw new SparkException("Error in registering StreamingQuery Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }

  }

abhishekd0907 avatar May 14 '20 12:05 abhishekd0907

@mjose007 Kindly suggest https://github.com/qubole/streaminglens/issues/5.

For me state is always showing same. NONEWBATCH

rpatid10 avatar Oct 01 '21 16:10 rpatid10