teeyog

Results 30 issues of teeyog

Shuffle Write 请看 [Shuffle Write解析](http://www.jianshu.com/p/72da913f1407)。 本文将讲解shuffle Reduce部分,shuffle的下游Stage的第一个rdd是ShuffleRDD,通过其compute方法来获取上游Stage Shuffle Write溢写到磁盘文件数据的一个迭代器: ``` override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index +...

spark

### 需求 > spark应用程序中,只要task失败就发送邮件,并携带错误原因。 ### 背景 在spark程序中,task有失败重试机制(根据 ```spark.task.maxFailures ``` 配置,默认是4次),当task执行失败时,并不会直接导致整个应用程序down掉,只有在重试了 ```spark.task.maxFailures``` 次后任然失败的情况下才会使程序down掉。另外,spark on yarn模式还会受yarn的重试机制去重启这个spark程序,根据 ```yarn.resourcemanager.am.max-attempts``` 配置(默认是2次)。 即使spark程序task失败4次后,受yarn控制重启后在第4次执行成功了,一切都好像没有发生,我们只有通过spark的监控UI去看是否有失败的task,若有还得去查找看是哪个task由于什么原因失败了。基于以上原因,我们需要做个task失败的监控,只要失败就带上错误原因通知我们,及时发现问题,促使我们的程序更加健壮。 ### 捕获Task失败事件 顺藤摸瓜,task在Executor中执行,跟踪源码看task在失败后都干了啥? 1. 在executor中task执行完不管成功与否都会向execBackend报告task的状态; ``` execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) ``` 2. 在CoarseGrainedExecutorBackend中会向driver发送StatusUpdate状态变更信息; ```...

spark
apm

## 背景 目前 spark 对 MySQL 的操作只有 Append,Overwrite,ErrorIfExists,Ignore几种表级别的模式,有时我们需要对表进行行级别的操作,比如update。即我们需要构造这样的语句出来:```insert into tb (id,name,age) values (?,?,?) on duplicate key update id=?,name =? ,age=?;``` 需求:我们的目的是既不影响以前写的代码,又不引入新的API,只需新加一个配置如:```savemode=update```这样的形式来实现。 ## 实践 要满足以上需求,肯定是要改源码的,首先创建自己的saveMode,只是新加了一个Update而已: ``` public enum I4SaveMode { Append,...

spark
MySQL

> 本文在spark2.1以Standalone Cluster模式下解析 ## 概述 spark应用程序可以以Client模式和Cluster启动,区别在于Client模式下的Driver是在执行spark-submit命令节点上启动的,而Cluster模式下是Master随机选择的一台Worker通过DriverWrapper来启动Driver的。 大概流程为: - 通过spark-submit提交会调用SparkSubmit类,SparkSubmit类里通过反射调用Client,Client与Master通信来SubmitDriver,收到成功回复后退出JVM(SparkSubmit进程退出)。 - Master收到SubmitDriver后会随机选择一台能满足driver资源需求的Worker,然后与对应Worker通信发送启动driver的消息。Worker收到消息后根据driver的信息等来拼接成linux命令来启动DriverWrapper,在该类里面再启动driver,最后将Driver执行状态返回给Master。 - driver启动后接下来就是注册APP,在SparkContext启动过程中会通过创建AppClient并与Master通信要求注册application。 - Master收到消息后会去调度执行这个application,通过调度算法获取该application需要在哪些Worker上启动executor,接着与对应的Worker通信发送启动Executor的消息。 - Worker 收到消息后通过拼接linux命令,启动了CoarseGrainedExecutorBackend进程,接着向Driver通信进行Executor的注册,成功注册后会在CoarseGrainedExecutorBackend中创建Executor对象。 - 接着就是job的执行了,可以参看前面的文章…… ## Submit Driver 通过shell命令spark-submit提交一个自己编写的application,最终实际是通过java -cp调用的类是: ``` org.apache.spark.deploy.SparkSubmit ``` 在该类的main方法中,在Cluster模式下不使用Rest,会通过反射调用Client类: ```...

spark

## IQL (项目地址:https://github.com/teeyog/IQL) ![](https://upload-images.jianshu.io/upload_images/3597066-e19cdef507fd77a7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) [README-EN](https://github.com/teeyog/IQL/blob/master/docs/en/README-EN.md) 基于SparkSQL实现了一套即席查询服务,具有如下特性: - 优雅的交互方式,支持多种datasource/sink,多数据源混算 - spark常驻服务,基于zookeeper的引擎自动发现 - 负载均衡,多个引擎随机执行 - 多session模式实现并行查询 - 采用spark的FAIR调度,避免资源被大任务独占 - 基于spark的动态资源分配,在无任务的情况下不会占用executor资源 - 支持Cluster和Client模式启动 - 基于Structured Streaming实现SQL动态添加流 - 类似SparkShell交互式数据分析功能 - 高效的script管理,配合import/include语法完成各script的关联 - 对数据源操作的权限验证 支持的数据源:hdfs、hive、hbase、kafka、mysql、es、mongo...

SparkSql

## 背景 监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem 来完成的。通过Spark的Metrics系统,我们可以把Spark Metrics的收集到的信息发送到各种各样的Sink,比如HTTP、JMX以及CSV文件。 目前支持的Sink包括: - ConsoleSink - CSVSink - JmxSink - MetricsServlet - GraphiteSink - GangliaSink 有时我们需要实时获取metrics数据通过spark分析展示等需求,这个时候若有个KafkaSink将metrics指标数据实时往kafka发送那就太方便了,故有了这篇博文。 ## 实践 所有的Sink都需要继承Sink这个特质: ``` private[spark] trait Sink { def start(): Unit...

spark
apm
Kafka

## 背景 Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的DataSource API自己实现了一套比较方便操作HBase的API。 ## 写 HBase 写HBase会根据Dataframe的schema写入对应数据类型的数据到Hbase,先上使用示例: ``` import spark.implicits._ import org.apache.hack.spark._ val df = spark.createDataset(Seq(("ufo", "play"), ("yy", ""))).toDF("name", "like") // 方式一 val options = Map( "hbase.table.rowkey.field" ->...

spark
Hbase

### 前言 由前面博客我们知道了SparkSql整个解析流程如下: - sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan; - analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan; - optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan; - SparkPlan 将 LogicalPlan 转换成PhysicalPlan; - prepareForExecution()将...

SparkSql

### 前言 由前面博客我们知道了SparkSql整个解析流程如下: - sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan; - analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan; - optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan; - SparkPlan 将 LogicalPlan 转换成PhysicalPlan; - prepareForExecution()将...

SparkSql

### 前言 由上篇博客我们知道了SparkSql整个解析流程如下: - sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan; - analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan; - optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan; - SparkPlan 将 LogicalPlan 转换成PhysicalPlan; - prepareForExecution()将...

SparkSql