import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class JavaSparkWC { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("JavaSparkWC").setMaster("local[1]"); //提交任务入口类 JavaSparkContext jsc = new JavaSparkContext(conf); //获取数据 JavaRDD<String> lines = jsc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt"); //切分数据 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { List<String> splited = Arrays.asList(s.split(" ")); //生成list return splited; } }); //生成元祖 //一对一组 ,(输入单词,输出单词,输出1) JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //聚合 //2个相同key的value,聚合 JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //此前key为String类型,没有办法排序 //Java api并没有提供sortBy算子,此时需要把两个值位置调换,排序完成后,在换回来 final JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception { // return new Tuple2<Integer, String>(tup._2, tup._1); return tup.swap(); //swap(),交换方法 } }); //降序排序 JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false); //再次交换 JavaPairRDD<String, Integer> res = sorted.mapToPair( new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)throws Exception { return tup.swap(); } }); System.out.println(res.collect()); jsc.stop();//释放资源 } } |