Spring-Shiro-Spark icon indicating copy to clipboard operation
Spring-Shiro-Spark copied to clipboard

Spring Boot与Spark的集成实践

Open ZhuXS opened this issue 7 years ago • 2 comments

Spring Boot与Spark的集成实践

Spark程序的入口

Spark与Spring Boot集成的关键是能够在Spring的容器中提供一个Spark的入口。

SparkContext是Spark的入口,是程序与Spark集群交互的桥梁。在Spark 2.0引入了SparkSession,为用户提供了一个更加统一的切入点来使用Spark的各项功能,并且允许用户通过调用DataFrame和DataSet相关API来编写Spark程序,在本例中我们采用SparkSession来作为Spark的入口。

SparkSession依赖于SparkContext,而SparkContext依赖于SparkConf。因此我们创建这三个对象,并以Bean的形式存放到Spring的容器之中。 以SparkSession为例

@Bean
public SparkSession sparkSession(){
        return SparkSession
                .builder()
                .sparkContext(javaSparkContext().sc())
                .appName("Java Spark SQL basic example")
                .getOrCreate();
}

调用Spark Api

Spark提供了面向Java的Api,所以我们可以直接拿来用。

我们在SpringBoot的Configuration Class声明SparkSession、SparkContext、SparkConf这三个Bean,使得容器在启动时创建这三个对象。

我们在Service层注入SparkSession,就可以顺利使用Spark的各项功能了。

@Autowired
private SparkSession sparkSession;

WordCount示例

 List<Word> wordList = Arrays.stream(tempWords).map(Word::new).collect(Collectors.toList());
            dataFrame = sparkSession.createDataFrame(wordList,Word.class);
            dataFrame.show();

创建DataFrame

RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word"));
            List<Row> rows = groupedDataset.count().collectAsList();
return rows.stream().map(new Function<Row, Count>() {
                @Override
                public Count apply(Row row) {
                    return new Count(row.getString(0),row.getLong(1));
                }
            }).sorted(new CountComparator()).collect(Collectors.toList());

Count

详情请见WordCount

项目地址

ZhuXS avatar Oct 25 '17 10:10 ZhuXS

有没有试过spark2.2和springboot2.0 集成呢,冲突的依赖太多了。又试了一下Spark2.2和Springboot1.5.6,也会有一样的依赖冲突。主要集中在netty这块

2efPer avatar Mar 28 '18 08:03 2efPer

@2efPer 看一下mvn tree,把冲突的依赖exclude掉其中一个

ZhuXS avatar May 07 '18 09:05 ZhuXS