再识spark

一.示例

1.统计PV和UV

1.1统计PV

val conf = new SparkConf()    conf.setMaster("local").setAppName("pvuv")    val sc = new SparkContext(conf)    val lineRDD = sc.textFile("./pvuv.txt")?    lineRDD.map(x=>{      val sp=x.split("\\s")      (sp(5),1)    }).reduceByKey(_+_).foreach(println)

1.2统计UV

lineRDD.map(x=>{     val sp=x.split("\\s")     (sp(5),sp(0))   }).distinct().countByKey().foreach(println)

2.二次排序

?SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");final JavaSparkContext sc = new JavaSparkContext(sparkConf);?JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");?JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {?    /**     *      */    private static final long serialVersionUID = 1L;?    @Override    public Tuple2<SecondSortKey, String> call(String line) throws Exception {           String[] splited = line.split(" ");           int first = Integer.valueOf(splited[0]);           int second = Integer.valueOf(splited[1]);           SecondSortKey secondSortKey = new SecondSortKey(first,second);           return new Tuple2<SecondSortKey, String>(secondSortKey,line);    }});?pairSecondRDD.sortByKey(false).foreach(new                 VoidFunction<Tuple2<SecondSortKey,String>>() {        /**     *      */    private static final long serialVersionUID = 1L;?    @Override    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {             System.out.println(tuple._2);    }});?public class SecondSortKey  implements Serializable,Comparable<SecondSortKey>{    /**     *      */    private static final long serialVersionUID = 1L;    private int first;    private int second;    public int getFirst() {        return first;

相关推荐