spark streaming 如何在 start 之后,修改闭包对象 。计算更新

      spark streaming 在 start 之后 ,我想改变计算规则,系统报告不能修改. 异常如下

Exception in thread "Thread-14" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported

at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)

at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)

at org.apache.spark.streaming.dstream.FlatMappedDStream.<init>(FlatMappedDStream.scala:29)

at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)

 

 

我的版本是  2.0. 但是我们的业务需求,就是要根据  用户定义的报表规则来统计报表。 经过分析, 报表的计算规则 ,可以包装成 对象, 来在 driver 中更新,用数据驱动的方式,来完成这一需求。 

       但是  spark streaming 一段  start 之后,闭包里面的 函数  和对象都是不能再修改的。 所有需要自己动手修源码。 修改后  打包 测试通过。

     修改如下:

spark stream  change list

**************************************************************************************************************

org.apache.spark.streaming.DStreamGraph

 //yunzhi.lyz  val -> var

  private var outputStreams = new ArrayBuffer[DStream[_]]()

  

  

  

  

  def start(time: Time) {

    this.synchronized {

      // yunzhi.lyz  delete

      //require(zeroTime == null, "DStream graph computation already started")

      zeroTime = time

      startTime = time

      outputStreams.foreach(_.initialize(zeroTime))

      outputStreams.foreach(_.remember(rememberDuration))

      outputStreams.foreach(_.validateAtStart)

      inputStreams.par.foreach(_.start())

    }

  }

  

  

  

  

  

    // yunzhi.lyz + def

  def clearOutputStreams()= this.synchronized{

    

     outputStreams = new ArrayBuffer[DStream[_]]()

  }

  

  

  

  **************************************************************************************************************

  org.apache.spark.streaming.StreamingContext

  

  

  

    // yunzhi.lyz +def 

  def getGraph = graph

  def getScheduler() = scheduler

  

  

  **************************************************************************************************************

  org.apache.spark.streaming.dstream.DStream

  

    // yunzhi.lyz + def 

  def getGraph() = graph

  

  

  

    private[streaming] def initialize(time: Time) {

   // yunzhi.lyz delete 

 //    if (zeroTime != null && zeroTime != time) {

//      throw new SparkException("ZeroTime is already initialized to " + zeroTime

//        + ", cannot initialize it again to " + time)

//    }

    zeroTime = time

    ......

    

    

    

    

    private def validateAtInit(): Unit = {

    

    

    

    ssc.getState() match {

      case StreamingContextState.INITIALIZED =>

        // good to go

          // yunzhil.lyz --

      case StreamingContextState.ACTIVE =>

//        throw new IllegalStateException(

//          "Adding new inputs, transformations, and output operations after " +

//            "starting a context is not supported")

      case StreamingContextState.STOPPED =>

        throw new IllegalStateException(

          "Adding new inputs, transformations, and output operations after " +

            "stopping a context is not supported")

    }

    

    

  }

  

  

  

    /**

   * Get the RDD corresponding to the given time; either retrieve it from cache

   * or compute-and-cache it.

   */

  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

    // If RDD was already generated, then retrieve it from HashMap,

    // or else compute the RDD

    generatedRDDs.get(time).orElse {

      // Compute the RDD if time is valid (e.g. correct time in a sliding window)

      // of RDD generation, else generate nothing.

      

      // yunzhi.lyz --

  //    if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {

          // Disable checks for existing output directories in jobs launched by the streaming

          // scheduler, since we may need to write output to an existing directory during checkpoint

          // recovery; see SPARK-4835 for more details. We need to have this call here because

          // compute() might cause Spark jobs to be launched.

          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

            compute(time)

          }

        }

        rddOption.foreach { case newRDD =>

          // Register the generated RDD for caching and checkpointing

          if (storageLevel != StorageLevel.NONE) {

            newRDD.persist(storageLevel)

            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")

          }

          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {

            newRDD.checkpoint()

            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")

          }

          generatedRDDs.put(time, newRDD)

        }

        rddOption

        // yunzhi.lyz --

//      } else {

//        None

//      }

    }

  }

  

  

  

  

    private[streaming] def generateJob(time: Time): Option[Job] = {

    getOrCompute(time) match {

      case Some(rdd) => {

        val jobFunc = () => {

          val emptyFunc = { (iterator: Iterator[T]) => {} }

          context.sparkContext.runJob(rdd, emptyFunc)

        }

        Some(new Job(time, jobFunc))

      }

      // yunzhi.lyz --

     // case None => None

    }

  }

  

private[streaming] def clearMetadata(time: Time) {

    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)

    // yunzhi.lyz change 

    val oldRDDs = generatedRDDs

    .filter(_._1 != null)

    .filter(_._2 != null)

    .filter(_._1 <= (time - rememberDuration))

    //val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))

    

    logDebug("Clearing references to old RDDs: [


 

   **************************************************************************************************************

 org.apache.spark.streaming.receiver.ReceiverSupervisorImpl

   /** Report error to the receiver tracker */

  def reportError(message: String, error: Throwable) {

  // yunzhi.lyz --

   //  val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")

  //  trackerEndpoint.send(ReportError(streamId, message, errorString))

    logWarning("Reported error " + message + " - " + error)

  }

  

  

  

  

    override protected def onReceiverStop(message: String, error: Option[Throwable]) {

    logInfo("Deregistering receiver " + streamId)

    // yunzhi.lyz --

     //val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")

    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, ""))

    logInfo("Stopped receiver " + streamId)

  }

  

   **************************************************************************************************************

 org.apache.spark.streaming.scheduler.JobGenerator

   // yunzhi.lyz ++

  private var nextTime = -1L

  def getTimer() = timer

  

  

  

  

  

  

    // yunzhi.lyz  -- private

  def restart() {

    // If manual clock is being used for testing, then

    // either set the manual clock to the last checkpointed time,

    // or if the property is defined set it to that time

    if (clock.isInstanceOf[ManualClock]) {

      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds

      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)

      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)

    }

    

**************************************************************************************************************

     

     org.apache.spark.streaming.scheduler.JobScheduler

      // yunzhi.lyz +

  def getjobGenerator() = jobGenerator

  

  

  

**************************************************************************************************************

  org.apache.spark.streaming.util.RecurringTimer

    

    

 //yunzhi.lyz change  val -> var

  private var thread = new Thread("RecurringTimer - " + name) {

    setDaemon(true)

    override def run() { loop }

  }

   

 // yunzhi.lyz  ++ def 

  

  def getReStartTime(t:Int): Long = {

    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period

  }

  def getNextTime() :Long= {nextTime}

  

  

  

  

  

  

  // yunzhi.lyz ++ def

  def resume(startTime:Long):Long = synchronized{

    

    nextTime = startTime

    thread.resume()

   logInfo("ReStarted timer for " + name + " at time " + nextTime)

    nextTime

   

    

  }

  

  

  

    def stop(interruptTimer: Boolean): Long = synchronized {

    if (!stopped) {

      stopped = true

      if (interruptTimer) {

        thread.interrupt()

      }

      // yunzhi.lyz

      //thread.join()

      thread.suspend()

      logInfo("Stopped timer for " + name + " after time " + prevTime)

    }

    //prevTime

    nextTime

  }

  

    修改之后 ,重新编译 打包。

    测试代码

       

 def exchangeworkbatch(tp: String, st: String) {

if (tp.equals("+")) workbatch += (st)
  if (tp.equals("-")) workbatch += (st)
}

cnt = cnt + 1
val cntn = cnt
val wbArray = workbatch.toArray
workbatch.foreach(x => println("*************" + x))



TTObject.ssc.getGraph.clearOutputStreams

wordCounts5 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x))
  .map(x => ("wordCounts4** " + cntn + " **" + x, 1))
  .reduceByKey(_ + _)
wordCounts5.print


wordCounts3 = TTObject.lines.flatMap(x => flatmapchange(wbArray, x))
  .map(x => ("wordCounts3** " + cntn + " **" + x, 1))
  .reduceByKey(_ + _)
wordCounts3.print
TTObject.ssc.getGraph.start(new Time(TTObject.ssc.getScheduler.getjobGenerator.getTimer.getNextTime))

  

测试代码:

 package scalasrc

import org.apache.spark.streaming.ttstream.TTUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import scala.collection._

object TTWordCountFreshRule {
  val splitTag = new mutable.ArrayBuffer[String]
  def main(args: Array[String]) {
    val tagName = ""
    val subId = ""
    val accessKey = ""
    val sparkConf = new SparkConf().setAppName("TTWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //ssc.checkpoint("checkpoint")
    val numStreams = 3
    var offSet = 1478448000l
    if (offSet == 0) offSet = System.currentTimeMillis() / 1000
    val ttStreams = (1 to numStreams).map { i => TTUtils.createStream(ssc, tagName, subId, accessKey, offSet, i - 1).map(_._2) }
    val lines = ssc.union(ttStreams).filter(x => x != null)
    splitTag += "dd"
    val splitTagarray = splitTag.toArray
    val wordCounts5 = lines
      .flatMap(x => splitString(x, splitTagarray))
      .map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts5.count().print
    wordCounts5.print
    runJObThread
    ssc.start()
    ssc.awaitTermination()

    def runJObThread() {
      new Thread() {
        override def run {
          while (true) {
            try {
              println("runJObThread begin.....")
              Thread.sleep(20000)
              splitTag += "cc"
              // 重复提交
              var cnt = 0
              splitTag.foreach(x => {
                println(s" cnt: $cnt  ,value : $x ");
                cnt += 1
              })
              ssc.getGraph.clearOutputStreams
              val splitTagarray = splitTag.toArray
              val wordCounts5 = lines.flatMap(x => splitString(x, splitTagarray)).map(x => (x, 1)).reduceByKey(_ + _)
              wordCounts5.count().print
              wordCounts5.print
              ssc.getGraph.start(new Time(ssc.getScheduler.getjobGenerator.getTimer.getNextTime))
            } catch {
              case ex: Exception => println(ex.getMessage)
            }
          }
        }
      }.start
    }
  }


  def splitString(st: String, splitTagarray: Array[String]) = {
    var splitArray = new mutable.ArrayBuffer[String]
    splitTagarray.foreach(x => splitArray ++= st.split(x))
    splitArray.toArray
  }

}

如果有定制化报表,不停止 streaming 作业, 只需要在  Thread 里面 去 读存储,数据驱动,就可以完成计算规则动态化的需求。

问题,发现这么改 checkpoint ,有状态的计算 有问题。 

************************************************************************************

现在想想,不用改的那么蛋疼,在 worker 里面 引用个 object , 里面起个线程,去动态在库中刷配置就好了。这个唯一可能导致的问题, 各个 worker 获取 元数据时间不一致, 导致 准确性存在问题。 不过如果是 刚建立的报表  也有一段调试期的,不影响业务。

虽然 spark 的 map ,reduce 的算子里面 没有 setup ,但是 work 上可以 调  object 静态方法,  object 可以起进程。 map ,reduce 等算子中的函数,可以是object 中的函数, 在work 上 第一次 调用 object 的函数时候。会执行 除方法以外的所有语句。   

package streaming

import scala.collection._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, _}


object workerObjectGetMeta {
  def main(args: Array[String])
  {
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc  = new StreamingContext(sparkConf, Seconds(10))
    val lines = ssc.socketTextStream("192.168.0.140", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(x => CalUtils.splitS(x))
    val wordCounts = words.map(x => (x, 1))
    wordCounts.reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

object CalUtils{
  val splitArray = new mutable.ArrayBuffer[String]
  splitArray += "aa"
// new Thread() {}.start()  作为语句, 线程 在object 第一次被调用的时候自动起来
new Thread() {
    override def run {
      while (true) {
        try {
          println("runJObThread begin.....")
          Thread.sleep(5000)
          splitArray += "cc"
// 重复提交
var cnt = 0
splitArray.foreach(x => {
            println(s" cnt: $cnt  ,value : $x ");
cnt += 1
})
        }
      }
    }
  }.start()

  def splitS(s:String):Array[String] = {
    val rArray = new mutable.ArrayBuffer[String]
    splitArray.foreach( x => {
      rArray ++= s.split(x)
    })
    println(s"splitArray: ${splitArray.mkString(",")}  ")
    println(s"aArray: ${rArray.mkString(",")}  ")
    rArray.toArray
  }
}

相关推荐