Spark DataFrame处理数据倾斜问题
由于爬虫抓取等原因,会导致单一ID的日志条数过多。在spark中,同一ID的日志会被shuffle到单一的节点上进行处理,导致系统运行缓慢!
因为这些用户的访问本来就是无效的,所以可以直接过滤掉这部分用户。
话不多说,scala的DataFrame版输出和代码如下(参考链接见代码注释):
OriginalDataFrame(withfakeusers):
+---------+------+
|id|movie|
+---------+------+
|u1|WhoAmI|
|u2|Zoppia|
|u2|Lost|
|FakeUserA|Zoppia|
|FakeUserA|Lost|
|FakeUserA|Zoppia|
|FakeUserA|Lost|
|FakeUserA|Zoppia|
|FakeUserA|Lost|
|FakeUserB|Lost|
|FakeUserB|Lost|
|FakeUserB|Lost|
|FakeUserB|Lost|
+---------+------+
FakeUserswithcount(threshold=2):
+---------+-----+
|id|count|
+---------+-----+
|FakeUserA|6|
|FakeUserB|4|
+---------+-----+
FakeUsers:
Set(FakeUserA,FakeUserB)
Validusersafterfilter:
+---+------+
|id|movie|
+---+------+
|u1|WhoAmI|
|u2|Zoppia|
|u2|Lost|
+---+------+
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
/**
* Created by colinliang on 2017/8/14.
*/
case class IDMovie(id: String, movie: String)
object BroadcastTest {
def main(args: Array[String]): Unit = {
Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
println("spark version: " + sc.version)
sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val spark = new SQLContext(sc)
val idvids = List(
IDMovie("u1", "WhoAmI")
, IDMovie("u2", "Zoppia")
, IDMovie("u2", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
);
val df = spark
.createDataFrame(idvids)
.repartition(col("id"))
println("Original DataFrame (with fake users): ")
df.show()
// val df_fakeUsers_with_count=df.sample(false,0.1).groupBy(col("id")).count().filter(col("count")>2).limit(10000)//实际中可以根据需要仅采样一部分数据
val df_fakeUsers_with_count=df.groupBy(col("id")).count().filter(col("count")>2)
/**DataFrame 中的groupby 为aggregation形式的,不涉及shuffle,速度很快。参见:https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html
更多聚合函数参见:https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.functions$
此外,还可以通过agg()函数对groupBy后的数据的多列进行聚合
*/
println("Fake Users with count (threshold=2):")
df_fakeUsers_with_count.show()
val set_fakeUsers=df_fakeUsers_with_count.select("id").collect().map(_(0)).toList.map(_.toString).toArray[String].toSet
println("Fake Users:")
println(set_fakeUsers)
val set_fakeUsers_broadcast=sc.broadcast(set_fakeUsers)
/** broadcast教程:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
* 官方文档: http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
*/
val udf_isValidUser = udf((id: String) => !set_fakeUsers_broadcast.value.contains(id)) //直接用set_highCountUsers.contains(id) 也行,但效率低,因为反序列化的次数可能比较多,参见http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
val df_filtered=df.filter(udf_isValidUser(col("id")) ) //过滤掉这部分用户
/** 如果是要保留部分用户,而不是过滤掉这部分用户,且用户量很小,无需定义UDF:
* https://stackoverflow.com/questions/39234360/filter-spark-scala-dataframe-if-column-is-present-in-set
* val validValues = Set("A", "B", "C")
* data.filter($"myColumn".isin(validValues.toSeq: _*))
*/
/** 如果是要保留部分用户,且用户量比较大,可以用broadcast 的DataFrame:
* https://stackoverflow.com/questions/33824933/spark-dataframe-filtering-retain-element-belonging-to-a-list
* import org.apache.spark.sql.functions.broadcast
* initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
*/
println("\nValid users after filter:")
df_filtered.show()
}
}