博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming流计算框架的运行源码
阅读量:6353 次
发布时间:2019-06-22

本文共 19012 字,大约阅读时间需要 63 分钟。

hot3.png

Spark Streaming的Job到底是如何运行的,我们下面以一个例子来解析一下:

package com.dt.spark.streaming import com.dt.spark.common.ConnectPoolimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}  /** *  * 以网站热词排名为例,将处理结果写到MySQL中 * Created by dinglq on 2016/5/3. */object WriteDataToMySQL {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("WriteDataToMySQL")    val ssc = new StreamingContext(conf,Seconds(5))    // 假设socket输入的数据格式为:searchKeyword,time    val ItemsStream = ssc.socketTextStream("local[2]",9999)    // 将输入数据变成(searchKeyword,1)    var ItemPairs = ItemsStream.map(line =>(line.split(",")(0),1))      val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(60),Seconds(10))    //ssc.checkpoint("/user/checkpoints/")    // val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))    /**     * 接下来需要对热词的频率进行排序,而DStream没有提供sort的方法。那么我们可以实现transform函数,用RDD的sortByKey实现     */    val hottestWord = ItemCount.transform(itemRDD => {      val top3 = itemRDD.map(pair => (pair._2, pair._1))        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)      ssc.sparkContext.makeRDD(top3)    })     hottestWord.foreachRDD(rdd => {      rdd.foreachPartition(partitionOfRecords =>{        val conn = ConnectPool.getConnection        conn.setAutoCommit(false);  //设为手动提交        val  stmt = conn.createStatement();         partitionOfRecords.foreach( record => {           stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')");        })         stmt.executeBatch();        conn.commit();  //提交事务       })    })     ssc.start()    ssc.awaitTermination()  }}

将代码提交至Spark 集群运行:

1.程序最初会初始化StreamingContext

def this(conf: SparkConf, batchDuration: Duration) = {  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}

 注意我们的spark Streaming程序是长时间运行的,所以我们需要阻塞主线程不让jvm退出,保证作业长时间运行:

/** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */def awaitTermination() {  waiter.waitForStopOrError()}

 注意这里一旦出现异常线程会被中断,计算过程中需要小心处理异常!

StreamingContext初始化的过程中会做如下事情

2.构造DStreamGraph

private[streaming] val graph: DStreamGraph = {  if (isCheckpointPresent) {    cp_.graph.setContext(this)    cp_.graph.restoreCheckpointData()    cp_.graph  } else {    require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")    val newGraph = new DStreamGraph()    newGraph.setBatchDuration(batchDur_)    newGraph  }}

3.构造JobScheduler对象

private[streaming] val scheduler = new JobScheduler(this)

而在JobScheduler对象初始化的过程会构造如下对象:JobGenerator、StreamingListenerBus

4.构造JobGenerator对象(JobScheduler.scala的第50行)

private val jobGenerator = new JobGenerator(this)

5.而JobGenerator在实例化时,则会构造一个RecurringTimer(JobGenerator.scala的第58行)

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

6.构造StreamingListenerBus对象(JobScheduler.scala的第52行)

val listenerBus = new StreamingListenerBus()

到此,StreamingContext实例化的工作完成

7.定义输入流

val ItemsStream = ssc.socketTextStream("local[2]",9999)

注意本地模式需要两个线程,一个用于接收数据,一个线程用于计算!

8.此方法会生成一个SocketInputDStream

def socketTextStream(    hostname: String,    port: Int,    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)} def socketStream[T: ClassTag](    hostname: String,    port: Int,    converter: (InputStream) => Iterator[T],    storageLevel: StorageLevel  ): ReceiverInputDStream[T] = {  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)}

9.在InputDStream的构造过程中,会将此输入流SocketInputDStream添加到DStreamGraph的inputStreams数据结构中(InputDStream.scala的第47行

ssc.graph.addInputStream(this)

备注:这里最终调用:

DStreamGraph(83行)def addInputStream(inputStream: InputDStream[_]) {  this.synchronized {    inputStream.setGraph(this)    inputStreams += inputStream  }}

也就是说DStreamGraph 记录了Dstream的输入,构成了RDD的DAG模板

10.在ReceiverInputDStream构建的过程中会初始化一个ReceiverRateController

override protected[streaming] val rateController: Option[RateController] = {  if (RateController.isBackPressureEnabled(ssc.conf)) {    Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))  } else {    None  }}

在DStreamGraph中有个outputStreams,表示SparkStreaming程序的输出流,在需要数据输出时,例如print(最终也会调用foreachRDD方法),foreachRDD等都会讲此DStream注册给outputStreams。(DStream.scala的第684行)

private def foreachRDD(    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean): Unit = {  new ForEachDStream(this,    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}

11.将DStream注册给DStreamGraph(DStream.scala的第969行)

private[streaming] def register(): DStream[T] = {  ssc.graph.addOutputStream(this)  this}

Streaming程序的整个业务代码,就是将InputDStream经过各种转换计算变成OutputDStream的过程。

12. StreamingContext启动

/** * :: Experimental :: * * Either get the currently active StreamingContext (that is, started but not stopped), * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data * does not exist in the provided, then create a new StreamingContext by calling the provided * `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc   Function to create a new StreamingContext * @param hadoopConf     Optional Hadoop configuration if necessary for reading from the *                       file system * @param createOnError  Optional, whether to create a new StreamingContext if there is an *                       error in reading checkpoint data. By default, an exception will be *                       thrown on error. */@Experimentaldef getActiveOrCreate(    checkpointPath: String,    creatingFunc: () => StreamingContext,    hadoopConf: Configuration = SparkHadoopUtil.get.conf,    createOnError: Boolean = false  ): StreamingContext = {  ACTIVATION_LOCK.synchronized {    getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }  }}

注意这里我们可以设置checkpoint来回复StreamingContext对象,保证接着上次任务继续运行。

启动过程中,会判断StreamingContext的状态,它有三个状态INITIALIZED、ACTIVE、STOP。只有状态为INITAILIZED才允许启动。代码如下

def start(): Unit = synchronized {  state match {    case INITIALIZED =>      startSite.set(DStream.getCreationSite())      StreamingContext.ACTIVATION_LOCK.synchronized {        StreamingContext.assertNoOtherContextIsActive()        try {          validate()           // Start the streaming scheduler in a new thread, so that thread local properties          // like call sites and job groups can be reset without affecting those of the          // current thread.          ThreadUtils.runInNewThread("streaming-start") {            sparkContext.setCallSite(startSite.get)            sparkContext.clearJobGroup()            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")            scheduler.start()          }          state = StreamingContextState.ACTIVE        } catch {          case NonFatal(e) =>            logError("Error starting the context, marking it as stopped", e)            scheduler.stop(false)            state = StreamingContextState.STOPPED            throw e        }        StreamingContext.setActiveContext(this)      }      shutdownHookRef = ShutdownHookManager.addShutdownHook(        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)      // Registering Streaming Metrics at the start of the StreamingContext      assert(env.metricsSystem != null)      env.metricsSystem.registerSource(streamingSource)      uiTab.foreach(_.attach())      logInfo("StreamingContext started")    case ACTIVE =>      logWarning("StreamingContext has already been started")    case STOPPED =>      throw new IllegalStateException("StreamingContext has already been stopped")  }}

13.调用JobScheduler的start方法(scheduler.start())

JobScheduler.scala的第62行

def start(): Unit = synchronized {  if (eventLoop != null) return // scheduler has already been started   logDebug("Starting JobScheduler")  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  }  eventLoop.start()   // attach rate controllers of input streams to receive batch completion updates  for {    inputDStream <- ssc.graph.getInputStreams    rateController <- inputDStream.rateController  } ssc.addStreamingListener(rateController)   listenerBus.start(ssc.sparkContext)  receiverTracker = new ReceiverTracker(ssc)  inputInfoTracker = new InputInfoTracker(ssc)  receiverTracker.start()  jobGenerator.start()  logInfo("Started JobScheduler")}

14.在上段代码中,首先会构造一个EventLoop[JobSchedulerEvent]对象,并调用其start方法

eventLoop.start()

15.让JobScheduler的StreamingListenerBus对象监听输入流的ReceiverRateController对象

 for {    inputDStream <- ssc.graph.getInputStreams    rateController <- inputDStream.rateController  } ssc.addStreamingListener(rateController)

StreamingContext.scala的第536行

def 
addStreamingListener(streamingListener
: 
StreamingListener) {
  
scheduler.listenerBus.addListener(streamingListener)
}

17.实例化receiverTracker和InputInfoTracker,并调用receiverTracker的start方法

receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()

18.在receiverTracker的start方法中,会构造一个ReceiverTrackerEndpoint对象(ReceiverTracker.scala的第149行)

/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {  if (isTrackerStarted) {    throw new SparkException("ReceiverTracker already started")  }   if (!receiverInputStreams.isEmpty) {    endpoint = ssc.env.rpcEnv.setupEndpoint(      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))    if (!skipReceiverLaunch) launchReceivers()    logInfo("ReceiverTracker started")    trackerState = Started  }}

19.获取各个InputDStream的receiver,并且在相应的worker节点启动Receiver 。ReceiverTracker.scala的第413行

/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */private def launchReceivers(): Unit = {  val receivers = receiverInputStreams.map(nis => {    val rcvr = nis.getReceiver()    rcvr.setReceiverId(nis.id)    rcvr  })   runDummySparkJob()   logInfo("Starting " + receivers.length + " receivers")  endpoint.send(StartAllReceivers(receivers))}

20.ReceiverTrackerEndpoint接收到StartAllReceivers消息,并做如下处理

ReceiverTracker.scala的第449行

case StartAllReceivers(receivers) =>  val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)  for (receiver <- receivers) {    val executors = scheduledLocations(receiver.streamId)    updateReceiverScheduledExecutors(receiver.streamId, executors)    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation    startReceiver(receiver, executors)  }

在Executor上启动receiver,此处可以得知,receiver可以有多个

21.然后回到13步的代码,调用JobGenerator.start()

JobGenerator.scala的第78行

/** Start generation of jobs */def start(): Unit = synchronized {  if (eventLoop != null) return // generator has already been started   // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.  // See SPARK-10125  checkpointWriter   eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)     override protected def onError(e: Throwable): Unit = {      jobScheduler.reportError("Error in job generator", e)    }  }  eventLoop.start()   if (ssc.isCheckpointPresent) {    restart()  } else {    startFirstTime()  }}

22.构造EventLoop[JobGeneratorEvent],并调用其start方法

1 eventLoop.start()

23.判断当前程序启动时,是否使用Checkpoint数据做恢复,来选择调用restart或者startFirstTime方法。我们的代码将调用

startFirstTime()

JobGenerator.scala的第190行

private def startFirstTime() {  val startTime = new Time(timer.getStartTime())  graph.start(startTime - graph.batchDuration)  timer.start(startTime.milliseconds)  logInfo("Started JobGenerator at " + startTime)}

24.调用DStreamGraph的start方法

def start(time: Time) {  this.synchronized {    require(zeroTime == null, "DStream graph computation already started")    zeroTime = time    startTime = time    outputStreams.foreach(_.initialize(zeroTime))    outputStreams.foreach(_.remember(rememberDuration))    outputStreams.foreach(_.validateAtStart)    inputStreams.par.foreach(_.start())  }}

此时,InputDStream启动,并开始接收数据。

InputDStream和ReceiverInputDStream的start方法都是空的。

InputDStream.scala的第110行

/** Method called to start receiving data. Subclasses must implement this method. */def start()

ReceiverInputDStream.scala的第63行

// Nothing to start or stop as both taken care of by the ReceiverTracker.def start() {}

而SocketInputDStream没有定义start方法,所以

1inputStreams.par.foreach(_.start())

并没有做任何的事情,那么输入流到底是怎么被触发并开始接收数据的呢?

我们再看上面的第20步:

startReceiver(receiver, executors)

代码的具体实现在ReceiverTracker.scala的545行

private def startReceiver(    receiver: Receiver[_],    scheduledLocations: Seq[TaskLocation]): Unit = {  def shouldStartReceiver: Boolean = {    // It's okay to start when trackerState is Initialized or Started    !(isTrackerStopping || isTrackerStopped)  }   val receiverId = receiver.streamId  if (!shouldStartReceiver) {    onReceiverJobFinish(receiverId)    return  }   val checkpointDirOption = Option(ssc.checkpointDir)  val serializableHadoopConf =    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)   // Function to start the receiver on the worker node  val startReceiverFunc: Iterator[Receiver[_]] => Unit =    (iterator: Iterator[Receiver[_]]) => {      if (!iterator.hasNext) {        throw new SparkException(          "Could not start receiver as object not found.")      }      if (TaskContext.get().attemptNumber() == 0) {        val receiver = iterator.next()        assert(iterator.hasNext == false)        val supervisor = new ReceiverSupervisorImpl(          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)        supervisor.start()        supervisor.awaitTermination()      } else {        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.      }    }   // Create the RDD using the scheduledLocations to run the receiver in a Spark job  val receiverRDD: RDD[Receiver[_]] =    if (scheduledLocations.isEmpty) {      ssc.sc.makeRDD(Seq(receiver), 1)    } else {      val preferredLocations = scheduledLocations.map(_.toString).distinct      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))    }  receiverRDD.setName(s"Receiver $receiverId")  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())  // We will keep restarting the receiver job until ReceiverTracker is stopped  future.onComplete {    case Success(_) =>      if (!shouldStartReceiver) {        onReceiverJobFinish(receiverId)      } else {        logInfo(s"Restarting Receiver $receiverId")        self.send(RestartReceiver(receiver))      }    case Failure(e) =>      if (!shouldStartReceiver) {        onReceiverJobFinish(receiverId)      } else {        logError("Receiver has been stopped. Try to restart it.", e)        logInfo(s"Restarting Receiver $receiverId")        self.send(RestartReceiver(receiver))      }  }(submitJobThreadPool)  logInfo(s"Receiver ${receiver.streamId} started")}

它会将Receiver封装成RDD,以Job的方式提交到Spark集群中。submitJob的第二个参数,是一个函数,它的功能是在worker节点上启动receiver

注意这里是以任务的方式接受数据,当任务失败怎么办:

(1) 可插拔的 ReceiverSchedulingPolicy

ReceiverSchedulingPolicy 的主要目的,是在 Spark Streaming 层面添加对 Receiver 的分发目的地的计算,相对于之前版本依赖 Spark Core 的 TaskScheduler 进行通用分发,新的 ReceiverSchedulingPolicy 会对 Streaming 应用的更好的语义理解,也能计算出更好的分发策略。ReceiverSchedulingPolicy 有两个方法,分别用于:

  • 在 Streaming 程序首次启动时:

  • 收集所有 InputDStream 包含的所有 Receiver 实例 —— receivers

  • 收集所有的 executor —— executors —— 作为候选目的地

  • 然后就调用 ReceiverSchedulingPolicy.scheduleReceivers(receivers, executors) 来计每个个 Receiver 的目的地 executor 列表

  • 在 Streaming 程序运行过程中,如果需要重启某个 Receiver:

  • 将首先看一看之前计算过的目的地 executor 还没有还 alive 的

  • 如果没有,就需要 ReceiverSchedulingPolicy.rescheduleReceiver(receiver, ...) 来重新计算每个 Receiver 的目的地 executor 列表

(2) 每个 Receiver 分发有单独的 Job 负责

对于这仅有个一个 Task,只在第 1 次执行时,才尝试启动 Receiver;如果该 Task 因为失效而被调度到其它 executor 执行时,就不再尝试启动 Receiver、只做一个空操作,从而导致本 Job 的状态是成功执行已完成。ReceiverTracker 会另外调起一个 Job —— 有可能会重新计算 Receiver 的目的地 —— 来继续尝试 Receiver 分发……如此直到成功为止。另外,由于 Spark Core 的 Task 下发时只会参照并大部分时候尊重 Spark Streaming 设置的 preferredLocation 目的地信息,还是有一定可能该分发 Receiver 的 Job 并没有在我们想要调度的 executor 上运行。此时,在第 1 次执行 Task 时,会首先向ReceiverTracker 发送 RegisterReceiver 消息,只有得到肯定的答复时,才真正启动 Receiver,否则就继续做一个空操作,导致本 Job 的状态是成功执行已完成。当然,ReceiverTracker 也会另外调起一个 Job,来继续尝试 Receiver 分发……如此直到成功为止,这样就保证的数据的任务一直运行,为我们的集群提供源源不断的数据。

val supervisor = new ReceiverSupervisorImpl(  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)supervisor.start()supervisor.awaitTermination()

在supervisor.start方法中会调用如下代码

ReceiverSupervisor.scala的127行

/** Start the supervisor */def start() {  onStart()  startReceiver()}

onStart()方法是在ReceiverSupervisorImpl中实现的(ReceiverSupervisorImpl.scala的172行)

override protected def onStart() {  registeredBlockGenerators.foreach { _.start() }}

在startReceiver中,会调用receiver的Onstart方法,启动receiver。

注:这里要弄清楚ReceiverInputDStream和Recevier的区别。Receiver是具体接收数据的,而ReceiverInputDStream是对Receiver做了一成封装,将数据转换成DStream 。

我们本例中的Receiver是通过SocketInputDStream的getReceiver方法获取的(在第19步的时候被调用)。

ReceiverInputDStream.scala的42行

def getReceiver(): Receiver[T] = {  new SocketReceiver(host, port, bytesToObjects, storageLevel)}

而SocketReceiver会不断的从Socket中获取数据。

我们看看SocketReceiver的onStart方法:

def onStart() {  // Start the thread that receives data over a connection  new Thread("Socket Receiver") {    setDaemon(true)    override def run() { receive() }  }.start()}
/** Create a socket connection and receive data until receiver is stopped */def receive() {  var socket: Socket = null  try {    logInfo("Connecting to " + host + ":" + port)    socket = new Socket(host, port)    logInfo("Connected to " + host + ":" + port)    val iterator = bytesToObjects(socket.getInputStream())    while(!isStopped && iterator.hasNext) {      store(iterator.next)    }    if (!isStopped()) {      restart("Socket data stream had no more data")    } else {      logInfo("Stopped receiving")    }  } catch {    case e: java.net.ConnectException =>      restart("Error connecting to " + host + ":" + port, e)    case NonFatal(e) =>      logWarning("Error receiving data", e)      restart("Error receiving data", e)  } finally {    if (socket != null) {      socket.close()      logInfo("Closed socket to " + host + ":" + port)    }  }}

转载于:https://my.oschina.net/u/1253652/blog/671336

你可能感兴趣的文章
Eclipse Debug Android Native Application
查看>>
java动态代理
查看>>
node.js原型继承
查看>>
揭露让Linux与Windows隔阂消失的奥秘(1)
查看>>
我的友情链接
查看>>
Mysql备份和恢复策略
查看>>
linux17-邮件服务器
查看>>
AS开发JNI步骤
查看>>
Android NDK开发:JNI基础篇
查看>>
使用Maven命令快速建立项目结构
查看>>
二分查找,php
查看>>
python面试题-django相关
查看>>
Python——eventlet.greenthread
查看>>
记大众点评之面试经历
查看>>
第三章:基本概念
查看>>
Jersey+mybatis实现web项目第一篇
查看>>
C++形参中const char * 与 char * 的区别
查看>>
espresso 2.0.4 Apple Xcode 4.4.1 coteditor 价格
查看>>
Object-C中emoji与json的问题
查看>>
一、Lambda表达式
查看>>