使用SPARK进行实时数据统计
项目背景
Spark是目前大数据领域最火的开源项目,也是自己在2015年的主要方向,最近使用Spark重构了公司内部的一个实时数据统计系统,下面结合这个项目,简单聊聊自己在使用Spark过程中遇到的问题、踩过的坑、以及一些经验。
这个系统的主要功能就是实时流量的多维度计算(每5分钟),并绘制曲线展现,对数据的正确性、稳定性、及时性有较高要求。本次使用Spark主要重构后端的计算模块(之前用pig/hive写的)。提到Spark在实时计算领域的应用,Kafka + Spark Streaming应该算是业界标配了(也有用Storm的),但由于我们对Spark掌握能力还不够,暂时直接用Spark去搞原始日志。每5分钟为一个计算点,日志量其实并不大,Spark处理绰绰有余。
环境搭建
工欲善其事,必先利其器,把编译测试环境搭好,会极大提高开发效率。我在MBP上进行开发,编译环境JDK7、Scala2.10.4、SBT0.13.5,IDE采用Intellj IDEA14。由于我使用的Spark1.2默认采用Scala2.10编译,所以我的工程也采用Scala2.10.4进行编译,而当我使JDK8的时候,编译报兼容性问题,故降为JDK7。SBT是一个编译打包工具,类似Ant、Maven、Gradle,要玩Scala最好用它,用起来也挺简单,主要有个build.sbt文件,在里面配好项目基本信息、Scala版本、Remote/Local仓库、Jar依赖(主要依赖spark-core)就OK了。从Spark官网下载一个1.2版本的二进制压缩包,解压后即完成本地测试环境的搭建。
核心概念RDD
Spark有句口号就是“One stack to rule them all”,内含Spark Core、Spark SQL、Spark Streaming、MLlib、Graphx等众多组件,我这次从最基本的Spark Core入手,使用原生Scala进行coding。Spark入门的话直接看官网入门文档即可,掌握基本原理、基本用法很简单。
Spark是RDD的世界,玩的主要就是RDD,官网对RDD的诠释如下:
- A collection of elements partitioned across the nodes of the cluster that can be operated on in parallel(集合、分布式,并行计算)
- Created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it(源自scala集合或外部存储,可以进行transform和action)
- Persist an RDD in memory, allowing it to be reused efficiently across parallel operations(持久化中间RDD,加速计算)
- RDDs automatically recover from node failures(基于lineage的容错)
RDD操作
然后我们开始编码。我的代码逻辑基于RDD主要分为三段,依次为RDD读入、RDD转换、RDD导出三步。
原始RDD的产生有两种方法,一种是通过sc.parellize将一个scala集合转化为RDD;一种将外部存储的数据转化为RDD,例如sc.textFile、sc.sequenceFile、sc.hadoopRDD(可以自定义InputFormat)等等。这里我直接采用sc.textFile(URL)生成原始RDD。
接着进行各种RDD转换,这块是重点逻辑。RDD主要有两种操作transformation和action,transformation就是有一个RDD经过转换生成另一个RDD,action就是在一个RDD上进行汇总最后将结果返回Driver。我使用的主要转换函数主要是map、filter、union和reduceByKey,基本满足我的需求。
下面主要说下reduceByKey。首先,reduceByKey与reduce不同,reduce是action返回一个数值,而reduceByKey是transformation返回一个RDD;其次,reduceByKey与map也不同,map作用于普通RDD,而reduceByKey、groupByKey这类shuffle函数作用于PairRDD,所以要求所处理的RDD的每个元素是一个Tuple2;最后,想要使用reduceByKey,需要import org.apache.spark.SparkContext._这句话才OK。
我拿WordCount举个例子: rdd.map(word => (word, 1)).reduceByKey((v1, v2) => v1 + v2) 注意,map内的函数必须返回一个Tuple2,此处是(word, 1);reduceByKey中的key就是Tuple2的第一个元素,即word;reudceByKey内的函数的参数还是一个Tuple2,但是是由两个value组成的,计算后返回一个value,这个value可以接着继续迭代。
把这点理解清楚了,就不难写出代码。有一点比较痛苦,当tuple存在嵌套时,就不得不用_1、_2来取第一个元素、第二元素,写起来代码可读性比较差,这里我创建了一些内部类来进行包装,可以一定程度上增强代码可读性。
最后进行RDD导出,直接使用rdd.saveAsTextFile即可把数据导出至HDFS或Local了。
以上三步RDD的读入、转换、导出完成后,代码主体架子便完成了。
经验
下面说说我踩过的一些坑,和一些小经验。
直接启用最新的Spark1.2,这个也是去年12月份Beijing Meetup,来自PayPal的黄涧石Spark经验分享的第一页PPT。spark之前的版本或多或少均有不同程度的问题,Spark1.2各方面均有大幅提升,详见Spark1.2 Release,强烈建议直接1.2搞起。
使用JDK7、Scala2.10.4编译环境,spark-core_2.10-1.2.0依赖,人生苦短,不要浪费太多时间在环境搭建上。(还有个spark-core_2.11,是用Scala2.11编译的,当然也完全可以用Scala2.11重新编译spark,官网链接。
不要启用spark.speculation,这个是指启用Spark推测执行机制,是Spark的一个优化点,Hadoop也有类似机制,但是有BUG,影响统计流程稳定性,当然你可以去修复它。JIRA链接在此,之前陈超微博也有提到。
Cache中间结果,提高统计效率。Spark牛逼的就是内存,把一些中间结果cache在内存中,后续基于这个cache计算,效率大幅提升。我的一个例子:
// Step-1: generate baseRDD
val baseRDD = generateBaseRDD(sc)
// Step-2: cache baseRDD
baseRDD.cache()
// Step-3: process subtasks parallel
val threadList = Array(
new Thread(new SubTask1(baseRDD)),
new Thread(newSubTask2(baseRDD)),
new Thread(newSubTask3(baseRDD))
)
threadList.map(_.start)
threadList.map(_.join)
Spark有stage的概念,一些shuffle操作会触发新的stage,stage内的map、filter使用pipe line进行执行,效率较高。但是我的一个stage内的map、filter、union等操作过多,导致一个stage内的task数过多。这时我的最大并发task总数(executor num * per core num)成为瓶颈,所以我需要降低task总数。这时你可以选择repartition(partitionNum)或coalesce(partitionNum)降低partition个数,进而减少task个数(一个partition对应一个task)。当然这两个操作是有代价的,repartition会触发一次shuffle,也就是一个新的stage,coalesce默认只是不同task间进行数据拷贝,也可以进行shuffle,这个需要自己权衡。我的一个例子,最后一次reduceByKey后就是最终结果,数据量很少,但是有一千多个partition,最后数据导出时生成过多小文件,耗费过多时间。我设置coalesce(5),最后只生成5个文件,而且由于每个文件很小,所以task间数据copy的代价很小,这个优化提速较明显。
当与RDD操作有关的对象需要进行序列化,因此需要实现Serializable接口(scala叫做trait,不用implements,用with)。
Scala本身不像Java,没有受检异常一说,写代码的时候很爽,但是需要注意些catch些已知的异常,至少关键代码最外层catch一下,不要让程序挂掉。
每个Spark程序都需要创建SparkContext对象,最后需要调用sc.stop关闭,但是我遇到一个问题,极少情况下会遇到关闭sc后程序仍然卡住不动的情况,JIRA链接,具体还咋研究。我在最外层用了一个超时处理,至少卡住一段时间后,程序能够失败退出。
val future: FutureTask[Int] = new FutureTask[Int](new RealTimeTask)
val executor: ExecutorService = Executors.newFixedThreadPool(1)
executor.execute(future)
var exitCode = 1
try {
exitCode = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
} catch {
case e: TimeoutException => println("RealTimeTask run time exceed timeout limit : " + TIMEOUT_SECONDS)
case e: Exception => e.printStackTrace
} finally {
executor.shutdownNow
}
System.exit(exitCode)
进过一些简单的优化后,我的统计代码最快可以在3分钟内完成多维度统计。但是还有一个因素,就是资源申请,这个在我们集群也很耗时,一方面要看能不能获取较多的资源执行任务,另一方面也要看能不能最快地时间获取所需的资源。我们的应用跑在YARN上,需要对相关队列进行配置,当然也可以Spark也可以跑在Standalone和Mesos上。
还有一些就是Scala语言需要注意的地方,这个我单独再写一篇博客吧。Scala号称比C++复杂的语言,语法相当丰富,用法非常灵活,建议学完基础语法后,再补一补最佳实践。
一些资料:
- Spark官网:http://spark.apache.org/
- Spark1.2 Release:http://spark.apache.org/releases/spark-release-1-2-0.html
- Scala官网:http://www.scala-lang.org/
- Scala入门教程:http://twitter.github.io/scala_school/
- Scala入门书籍:《Scala for the Impatient》
- 一个不错的SBT入门教程
Comments