Stream整合Flume
package com.bawei.stream
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamFlume {
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
//配置sparkConf参数
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
//构建sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//构建StreamingContext对象,每个批处理的时间间隔
val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
//设置checkpoint
scc.checkpoint("C:\\Users\\Desktop\\checkpoint2")
//设置flume的地址,可以设置多台
val address=Seq(new InetSocketAddress("192.168.182.147",8888))
// 从flume中拉取数据
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)
//获取flume中数据,数据存在event的body中,转化为String
val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
//实现单词汇总
val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
result.print()
scc.start()
scc.awaitTermination()
}
} 相关推荐
chenguangchun 2020-07-26
myt0 2020-07-18
IT影风 2020-07-18
chenguangchun 2020-06-28
jiaomrswang 2020-06-26
myt0 2020-06-16
xiaoxiaojavacsdn 2020-06-08
zzjmay 2020-06-07
strongyoung 2020-06-04
ErixHao 2020-05-20
啦啦啦啦啦 2020-05-15
wanfuchun 2020-05-14
xiaoxiaojavacsdn 2020-05-01
chenguangchun 2020-04-18
QAnyang 2020-03-14
wsong 2020-03-13
硅步至千里 2020-02-22