Spark中自定义累加器
通过继承AccumulatorV2可以实现自定义累加器。
官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
下面是我自己写的一个统计卡种数量的案例。
package com.shuai7boy.myscalacode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
case class Card(var card1Count: Int, var card2Count: Int)
class CalcCardCount extends AccumulatorV2[Card, Card] {
var result = new Card(0, 0)
/** *
* 判断,这个要和reset设定值一致
*
* @return
*/
override def isZero: Boolean = {
result.card1Count == 0 && result.card2Count == 0
}
/** *
* 复制一个新的对象
*
* @return
*/
override def copy(): AccumulatorV2[Card, Card] = {
val newCalcCardCount = new CalcCardCount()
newCalcCardCount.result = this.result
newCalcCardCount
}
/** *
* 重置每个分区的数值
*/
override def reset(): Unit = {
result.card1Count = 0
result.card2Count = 0
}
/**
* 每个分区累加自己的数值
*
* @param v
*/
override def add(v: Card): Unit = {
result.card1Count += v.card1Count
result.card2Count += v.card2Count
}
/** *
* 合并分区值,求得总值
*
* @param other
*/
override def merge(other: AccumulatorV2[Card, Card]): Unit = other match {
case o: CalcCardCount => {
result.card1Count += o.result.card1Count
result.card2Count += o.result.card2Count
}
}
//返回结果
override def value: Card = result
}
object CardCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local")
val sc = new SparkContext(conf)
val cc = new CalcCardCount
sc.register(cc)
val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), 2)
val cardMapRDD = cardList.map(card => {
var cardInfo = new Card(0, 0)
card.split(" ")(0) match {
case "card1" => cardInfo = Card(card.split(" ")(1).toInt, 0)
case "card2" => cardInfo = Card(0, card.split(" ")(1).toInt)
case _ => Card(0, 0)
}
cc.add(cardInfo)
})
cardMapRDD.count() //执行action,触发上面的累加操作
println("card1总数量为:" + cc.result.card1Count + ",card2总数量为:" + cc.result.card2Count)
}
}打印结果是:
card1总数量为:11,card2总数量为:7
通过上面代码,就可以同时统计两个变量的值了,当然如果需要更多,可以扩展。默认的累加器只实现了一个。
相关推荐
zhixingheyitian 2020-07-19
Johnson0 2020-07-28
Hhanwen 2020-07-26
yanqianglifei 2020-07-07
Hhanwen 2020-07-05
Hhanwen 2020-06-25
rongwenbin 2020-06-15
sxyhetao 2020-06-12
hovermenu 2020-06-10
Oeljeklaus 2020-06-10
zhixingheyitian 2020-06-08
Johnson0 2020-06-08
zhixingheyitian 2020-06-01
xclxcl 2020-05-31
Hhanwen 2020-05-29
zhixingheyitian 2020-05-29
Oeljeklaus 2020-05-29