streaminglens
streaminglens copied to clipboard
Not able to run streaminglens in intellij idea
I am running my code locally in intellij idea with sreaming lens maven dependancy . I am getting below error . No output , let me know what i am doing wrong here
package com.manu.sstreaming;
import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* @author Manu Jose
* create on : 16/04/20
*/
public class SStreamingNC {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 9999;
//int port = Integer.parseInt(args[0]);
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.master("local")
.getOrCreate();
Map<String, String> options = new HashMap<>();
options.put("streamingLens.reporter.intervalMinutes", "1");
scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
Predef.conforms());
StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
streamingLens.registerListeners();
// Create DataFrame representing the stream of input lines from connection to host:port
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load();
// Split the lines into words
Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("update")
.format("console")
.queryName("Query_name")
.trigger(Trigger.ProcessingTime(2 * 1000))
.start();
spark.streams().awaitAnyTermination();
}
}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145
Any updates
@mjose007
Sorry for the delayed response.
Are you able to see any streaming lens related logs (even errors or failures) in your spark driver logs?
Also, you don't need need to registerListeners explicitly, this line is not needed
streamingLens.registerListeners();
only this should be sufficient
StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
My guess is it would be throwing an exception since you're trying two register the listeners twice based on this code
try {
sparkSession.sparkContext.addSparkListener(streamingAppListener)
logDebug("Successfully registered Spark Listener")
} catch {
case e: Exception =>
throw new SparkException("Error in registering Spark Listener " +
"Won't report StreamingLens Insights" + e.getMessage)
}
try {
sparkSession.streams.addListener(queryProgressListener)
logDebug("Successfully registered StreamingQuery Listener")
} catch {
case e: Exception =>
sparkSession.sparkContext.removeSparkListener(streamingAppListener)
throw new SparkException("Error in registering StreamingQuery Listener " +
"Won't report StreamingLens Insights" + e.getMessage)
}
}
@mjose007 Kindly suggest https://github.com/qubole/streaminglens/issues/5.
For me state is always showing same. NONEWBATCH