最近在做基于Kafka + Spark Streaming的实时计算,今天研究了下Spark Streaming源码,在此记录下。主要以WordCount为例,具体讲解Spark Streaming的实现细节。

从WordCount说起

一个最简单的基于Spark Streaming的WordCount,代码如下:

object SocketWordCount extends App {
  val conf = new SparkConf().
      setMaster("local[*]").setAppName("WordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  val lines = ssc.socketTextStream("localhost", 9999)
  val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  wordCounts.print
  ssc.start
  ssc.awaitTermination
}

这个WordCount小程序很简单。首先创建一个SparkContext对象(与创建SparkContext不同,需要指定一个时间间隔);然后通过ssc.socketTextStream创建InputDStream,然后对DStream进行各种transformation,调用print将结果输出;最后调用ssc.start启动程序即可。

更多Spark Streaming资料,详见官网教程 Spark Streaming Programming Guide

创建StreamingContext

val ssc = new StreamingContext(conf, Seconds(10))

StreamingContext内部包含一个SparkContext,可以直接传入构造函数,或者通过传入的SparkConf新建;如果设置Checkpoint,可以通过Checkpoint.sparkConf新建。(注:为简化流程,后续解读均不涉及check point,write ahead log等细节)

除SparkContext,如下初始化组件需要注意:

DSreamGraph:主要含有一个inputStreams数组和一个outputStreams数组。

JobScheduler:调度SparkSteaming任务,主要包含一个ReceiverTracker(Receiver跟踪器),一个JobGenerator(JobGenerator生成器),以及一个JobScheduler Actor。

创建InputDSteam

val lines = ssc.socketTextStream(“localhost”, 9999)

socketTextStream函数新建并返回了一个SocketInputDStream对象,其继承关系依次为:SocketInputDStream val wordCounts = lines.flatMap(.split(“ “)).map((, 1)).reduceByKey(_ + _)

wordCounts.print

Spark Core运算的基本单位是RDD,而Spark Streaming则是在DStream之上进行计算;RDD有一系列的transformation和action,DStream也有很多transformationoutput operatioin。DStream是一个RDD的时间序列,其实最终计算还是会转移到RDD的计算上。

我们首先来看下DStream的构造(其实上节已经看其子类InputDSteam的实现):

下面我们分别选一个transformation和output operation实现来看下。

首先我们看下flatMap函数实现:

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
  new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}

很简单,返回了一个FlatMappedDStream。FlatMappedDStream实现也很简单,分别实现了slideDuration、dependencies、compute这三个函数。

override def dependencies = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
 parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}

通过Compute函数,可见其会调用getOrCompute,获取parent DStream在某个时间点的RDD,然后对RDD信息转换,生成新的RDD。

接下来,我们再来看下print函数的实现:

def print() {
  def foreachFunc = (rdd: RDD[T], time: Time) => {
    val first11 = rdd.take(11)
    first11.take(10).foreach(println)
  }
  new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

print()最后新建并返回了一个ForEachDStream,而所有output operation均是如此,我们再来看下ForEachDStream的实现:

override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
  parent.getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        ssc.sparkContext.setCallSite(creationSite)
        foreachFunc(rdd, time)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

其compute函数返回None,但是多了一个generateJob函数,生成new Job(time, jobFunc)对象,而Job之后会被调度。

启动StreamingContext

ssc.start

很简单,启动JobScheduler,而JobScheduler接着启动了ReceiverTrackerJobGenerator

ReceiverTracker主要负责原始数据的读入,而JobGenerator主要负责具体Job的触发与执行。下面我将分三个小节来分别讲解这两个核心组件,ReceiverTracker内容较多,分receiver启动和外部数据读取两个小节讲解。

ReceiverTracker源码分析(一) receiver启动

ReceiverTracker启动后,会创建ReceiverTrackerActor,响应RegisterReceiver、AddBlock、ReportError、DeregisterReceiver事件。

接着启动ReceiverLauncher线程,这个线程将通过startReceivers函数,启动这个集群上的所有receivers。

我们看下startReceivers函数:

下面重点庄转移到startReceiver函数,其新建了一个ReceiverSupervisorImpl,对receiver的一个包装类,然后启动supervisor,之后awaitTermination阻塞。

接着看ReceiverSupervisorImpl启动做了什么。调用两个函数onStart()和startReceiver()

至此,Receiver的启动过程完毕!!!

ReceiverTracker源码分析(二) 外部数据读取

数据的读入由receiver触发,receiver启动后会读取外部数据源的消息,有两种方法将其存储:

可见使用store(dataItem: T)无需自己生成Block,且有自动流控措施,但是当receiver挂掉的时候currentBuffer中的messages和blocksForPushing中的blocks均有可能会丢失。所以Unreliable Receivers可以使用,而对于Reliable Receivers,必须使用store(dataBuffer: ArrayBuffer[T])。详见官网 Custom Receiver Guide

最后再来说一说pushAndReportBlock方法:

至此,外部数据读取过程完毕!!!

JobGenerator源码分析

JobGenerator启动后,创一个JobGeneratorActor,响应GenerateJobs(time)、ClearMetadata(time)、DoCheckpoint(time)、ClearCheckpointData(time)等事件。

然后调用startFirstTime函数,依次启动graph(就是inputStreams和outputStreams的一些初始化,不讲了),并且根据配置的batchDuration定时向JobGeneratorActor发送GenerateJobs消息。

OK,来看看GenerateJobs做了什么:

至此,Job调度过程完毕!!!

总结:Spark Streaming与Spark Core的联系

总体来说,Spark Streaming的实现以Spark Core为基础,通过ReceiverTracker来读取外部数据,通过JobGenerator定期生成计算任务,整体结构实现清晰明确。

Spark Streaming用到Spark Core的地方在总结下:

  1. Receiver分发至各个节点并执行,使用了Spark Core提交RDD任务的过程,很巧妙;
  2. 外部数据源读入的数据存入BlockManager;
  3. 对于InputDStream,每隔batchDuration切分的RDD,DStream间的transformation,即为RDD的transformation;
  4. 提交的任务最终转化为一个Spark Core的RDD计算任务。