假设,我们有这样的输入文件:
cate-a spu-1 1 cate-a spu-1 2 cate-a spu-2 3 cate-a spu-2 4 cate-a spu-3 5 cate-a spu-3 6 cate-a spu-1 7 cate-a spu-4 8 cate-a spu-4 9 cate-a spu-1 8 ...
我们希望得到分cate,分spu的总和,并且取分cate分spu的TOP3

如上图示,大致描述了MAP/REDUCE的运行流程:
MAP输出后,会进行分区操作,也就是决定KEY2/VALUE2发到哪些reduce上
分区由job.setPartitionerClass决定
在同一个分区内,会对KEY2进行排序,依据是job.setSortComparatorClass, 如果没有设置则根据KEY的compareTo方法
接下来进入分组阶段,会构造KEY3和VALUE迭代器
分组的依据是job.setGroupingComparatorClass,只要比较器比较的相同就在同一组
KEY3/VALUE迭代器交给reduce方法处理
步骤:
KEY应该是可序列化,可比较的,只需要注意实现WritableComparable即可。 重点关注compareTo方法。
@Override
public int compareTo(Cate2SpuKey that) {
System.out.println("开始对KEY进行排序...");
if(cate2.equals(that.getCate2())){
return spu.compareTo(that.getSpu());
}
return cate2.compareTo(that.getCate2());
}
分区,是KEY的第一次比较,extends Partitioner 并提供getPartition即可。 这里根据cate分区。
需要注意的是,分组类必须提供构造方法,并且重载 public int compare(WritableComparable w1, WritableComparable w2) 。这里根据cate,spu分组。
通过上述的,就可以取得分cate分spu的SUM(counts)值了。
通过eclipse hadoop插件,可以方便我们上传测试文件到HDFS,可以浏览,删除HDFS文件,更加方便的是,就像运行普通JAVA程序一样的运行/调试MR程序(不在需要打成JAR包),让我们可以追踪MR的每一步,非常方便进行逻辑性测试~

那么怎么取分cate分spu的TOP3呢?
我们只需要把上一个MR的输出文件,作为另一个MR的输入,并且以cate+counts 为KEY ,以spu为VALUE,根据cate分区,分组,排序的话:cate相同情况下,根据counts倒序; 最后在reduce阶段取TOP3即可。
@Override
protected void reduce(Cate2CountsKey key, Iterable<Text> values,
Reducer<Cate2CountsKey, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
System.out.println("reduce...");
System.out.println("VALUES迭代前... key:" + key.toString());
System.out.println("VALUES迭代前... key:" + key.getCounts());
int top = 3;
for(Text t : values){
if(top > 0){
System.out.println("VALUES迭代中... key:" + key.toString());
System.out.println("VALUES迭代中... key:" + key.getCounts());
context.write(new Text(key.getCate2() + "\t" + t.toString()),
new Text(key.getCounts()
+ ""));
top--;
}
}
System.out.println("reduce over...");
} 
那么到现在,分组取TOP就完成了。
|