Spring-Shiro-Spark
Spring-Shiro-Spark copied to clipboard
Spring Boot与Spark的集成实践
Spring Boot与Spark的集成实践
Spark程序的入口
Spark与Spring Boot集成的关键是能够在Spring的容器中提供一个Spark的入口。
SparkContext是Spark的入口,是程序与Spark集群交互的桥梁。在Spark 2.0引入了SparkSession,为用户提供了一个更加统一的切入点来使用Spark的各项功能,并且允许用户通过调用DataFrame和DataSet相关API来编写Spark程序,在本例中我们采用SparkSession来作为Spark的入口。
SparkSession依赖于SparkContext,而SparkContext依赖于SparkConf。因此我们创建这三个对象,并以Bean的形式存放到Spring的容器之中。 以SparkSession为例
@Bean
public SparkSession sparkSession(){
return SparkSession
.builder()
.sparkContext(javaSparkContext().sc())
.appName("Java Spark SQL basic example")
.getOrCreate();
}
调用Spark Api
Spark提供了面向Java的Api,所以我们可以直接拿来用。
我们在SpringBoot的Configuration Class声明SparkSession、SparkContext、SparkConf这三个Bean,使得容器在启动时创建这三个对象。
我们在Service层注入SparkSession,就可以顺利使用Spark的各项功能了。
@Autowired
private SparkSession sparkSession;
WordCount示例
List<Word> wordList = Arrays.stream(tempWords).map(Word::new).collect(Collectors.toList());
dataFrame = sparkSession.createDataFrame(wordList,Word.class);
dataFrame.show();
创建DataFrame
RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word"));
List<Row> rows = groupedDataset.count().collectAsList();
return rows.stream().map(new Function<Row, Count>() {
@Override
public Count apply(Row row) {
return new Count(row.getString(0),row.getLong(1));
}
}).sorted(new CountComparator()).collect(Collectors.toList());
Count
详情请见WordCount
有没有试过spark2.2和springboot2.0 集成呢,冲突的依赖太多了。又试了一下Spark2.2和Springboot1.5.6,也会有一样的依赖冲突。主要集中在netty这块
@2efPer 看一下mvn tree,把冲突的依赖exclude掉其中一个