项目背景

Spark是目前大数据领域最火的开源项目,也是自己在2015年的主要方向,最近使用Spark重构了公司内部的一个实时数据统计系统,下面结合这个项目,简单聊聊自己在使用Spark过程中遇到的问题踩过的坑、以及一些经验

这个系统的主要功能就是实时流量的多维度计算(每5分钟),并绘制曲线展现,对数据的正确性稳定性及时性有较高要求。本次使用Spark主要重构后端的计算模块(之前用pig/hive写的)。提到Spark在实时计算领域的应用,Kafka + Spark Streaming应该算是业界标配了(也有用Storm的),但由于我们对Spark掌握能力还不够,暂时直接用Spark去搞原始日志。每5分钟为一个计算点,日志量其实并不大,Spark处理绰绰有余。

alt text

环境搭建

工欲善其事,必先利其器,把编译测试环境搭好,会极大提高开发效率。我在MBP上进行开发,编译环境JDK7Scala2.10.4SBT0.13.5,IDE采用Intellj IDEA14。由于我使用的Spark1.2默认采用Scala2.10编译,所以我的工程也采用Scala2.10.4进行编译,而当我使JDK8的时候,编译报兼容性问题,故降为JDK7。SBT是一个编译打包工具,类似Ant、Maven、Gradle,要玩Scala最好用它,用起来也挺简单,主要有个build.sbt文件,在里面配好项目基本信息、Scala版本、Remote/Local仓库、Jar依赖(主要依赖spark-core)就OK了。从Spark官网下载一个1.2版本的二进制压缩包,解压后即完成本地测试环境的搭建。

核心概念RDD

Spark有句口号就是“One stack to rule them all”,内含Spark CoreSpark SQLSpark StreamingMLlibGraphx等众多组件,我这次从最基本的Spark Core入手,使用原生Scala进行coding。Spark入门的话直接看官网入门文档即可,掌握基本原理、基本用法很简单。

Spark是RDD的世界,玩的主要就是RDD,官网对RDD的诠释如下:

  1. A collection of elements partitioned across the nodes of the cluster that can be operated on in parallel(集合、分布式,并行计算)
  2. Created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it(源自scala集合或外部存储,可以进行transform和action)
  3. Persist an RDD in memory, allowing it to be reused efficiently across parallel operations(持久化中间RDD,加速计算)
  4. RDDs automatically recover from node failures(基于lineage的容错)

RDD操作

然后我们开始编码。我的代码逻辑基于RDD主要分为三段,依次为RDD读入RDD转换RDD导出三步。

原始RDD的产生有两种方法,一种是通过sc.parellize将一个scala集合转化为RDD;一种将外部存储的数据转化为RDD,例如sc.textFile、sc.sequenceFile、sc.hadoopRDD(可以自定义InputFormat)等等。这里我直接采用sc.textFile(URL)生成原始RDD。

接着进行各种RDD转换,这块是重点逻辑。RDD主要有两种操作transformationaction,transformation就是有一个RDD经过转换生成另一个RDD,action就是在一个RDD上进行汇总最后将结果返回Driver。我使用的主要转换函数主要是map、filter、union和reduceByKey,基本满足我的需求。

下面主要说下reduceByKey。首先,reduceByKey与reduce不同,reduce是action返回一个数值,而reduceByKey是transformation返回一个RDD;其次,reduceByKey与map也不同,map作用于普通RDD,而reduceByKey、groupByKey这类shuffle函数作用于PairRDD,所以要求所处理的RDD的每个元素是一个Tuple2;最后,想要使用reduceByKey,需要import org.apache.spark.SparkContext._这句话才OK。

我拿WordCount举个例子: rdd.map(word => (word, 1)).reduceByKey((v1, v2) => v1 + v2) 注意,map内的函数必须返回一个Tuple2,此处是(word, 1);reduceByKey中的key就是Tuple2的第一个元素,即word;reudceByKey内的函数的参数还是一个Tuple2,但是是由两个value组成的,计算后返回一个value,这个value可以接着继续迭代。

把这点理解清楚了,就不难写出代码。有一点比较痛苦,当tuple存在嵌套时,就不得不用_1、_2来取第一个元素、第二元素,写起来代码可读性比较差,这里我创建了一些内部类来进行包装,可以一定程度上增强代码可读性。

最后进行RDD导出,直接使用rdd.saveAsTextFile即可把数据导出至HDFS或Local了。

以上三步RDD的读入、转换、导出完成后,代码主体架子便完成了。

经验

下面说说我踩过的一些坑,和一些小经验。

直接启用最新的Spark1.2,这个也是去年12月份Beijing Meetup,来自PayPal的黄涧石Spark经验分享的第一页PPT。spark之前的版本或多或少均有不同程度的问题,Spark1.2各方面均有大幅提升,详见Spark1.2 Release强烈建议直接1.2搞起。

使用JDK7、Scala2.10.4编译环境,spark-core_2.10-1.2.0依赖,人生苦短,不要浪费太多时间在环境搭建上。(还有个spark-core_2.11,是用Scala2.11编译的,当然也完全可以用Scala2.11重新编译spark,官网链接

不要启用spark.speculation,这个是指启用Spark推测执行机制,是Spark的一个优化点,Hadoop也有类似机制,但是有BUG,影响统计流程稳定性,当然你可以去修复它。JIRA链接在此,之前陈超微博也有提到。

Cache中间结果,提高统计效率。Spark牛逼的就是内存,把一些中间结果cache在内存中,后续基于这个cache计算,效率大幅提升。我的一个例子:

// Step-1: generate baseRDD
val baseRDD = generateBaseRDD(sc)
// Step-2: cache baseRDD
baseRDD.cache()
// Step-3: process subtasks parallel
val threadList = Array(
  new Thread(new SubTask1(baseRDD)),
  new Thread(newSubTask2(baseRDD)),
  new Thread(newSubTask3(baseRDD))
)
threadList.map(_.start)
threadList.map(_.join)

Spark有stage的概念,一些shuffle操作会触发新的stage,stage内的map、filter使用pipe line进行执行,效率较高。但是我的一个stage内的map、filter、union等操作过多,导致一个stage内的task数过多。这时我的最大并发task总数(executor num * per core num)成为瓶颈,所以我需要降低task总数。这时你可以选择repartition(partitionNum)coalesce(partitionNum)降低partition个数,进而减少task个数(一个partition对应一个task)。当然这两个操作是有代价的,repartition会触发一次shuffle,也就是一个新的stage,coalesce默认只是不同task间进行数据拷贝,也可以进行shuffle,这个需要自己权衡。我的一个例子,最后一次reduceByKey后就是最终结果,数据量很少,但是有一千多个partition,最后数据导出时生成过多小文件,耗费过多时间。我设置coalesce(5),最后只生成5个文件,而且由于每个文件很小,所以task间数据copy的代价很小,这个优化提速较明显。

当与RDD操作有关的对象需要进行序列化,因此需要实现Serializable接口(scala叫做trait,不用implements,用with)。

Scala本身不像Java,没有受检异常一说,写代码的时候很爽,但是需要注意些catch些已知的异常,至少关键代码最外层catch一下,不要让程序挂掉。

每个Spark程序都需要创建SparkContext对象,最后需要调用sc.stop关闭,但是我遇到一个问题,极少情况下会遇到关闭sc后程序仍然卡住不动的情况,JIRA链接,具体还咋研究。我在最外层用了一个超时处理,至少卡住一段时间后,程序能够失败退出。

val future: FutureTask[Int] = new FutureTask[Int](new RealTimeTask)
val executor: ExecutorService = Executors.newFixedThreadPool(1)
executor.execute(future)
var exitCode = 1
try {
	exitCode = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
} catch {
	case e: TimeoutException => println("RealTimeTask run time exceed timeout limit : " + TIMEOUT_SECONDS)
	case e: Exception => e.printStackTrace
} finally {
	executor.shutdownNow
}
System.exit(exitCode)

进过一些简单的优化后,我的统计代码最快可以在3分钟内完成多维度统计。但是还有一个因素,就是资源申请,这个在我们集群也很耗时,一方面要看能不能获取较多的资源执行任务,另一方面也要看能不能最快地时间获取所需的资源。我们的应用跑在YARN上,需要对相关队列进行配置,当然也可以Spark也可以跑在Standalone和Mesos上。

还有一些就是Scala语言需要注意的地方,这个我单独再写一篇博客吧。Scala号称比C++复杂的语言,语法相当丰富,用法非常灵活,建议学完基础语法后,再补一补最佳实践。

一些资料: