小编给大家分享一下hadoop如何自定义GroupComparator实现求最大值,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 如下蚀测试数据,第一个字段为订单号,第二个字段为id 第三个字段为价格,数据以tab键分开
* 要求求出订单号相同的价格最大的条记录
O00001 123 1234
O00002 124 3435
O00003 125 132.78
O00004 126 334
O00004 127 8976
O00003 128 635
O00002 129 23
O00001 130 980
O00001 131 111
O00002 132 66
O00003 133 42
O00004 134 88
O00005 135 900
结果如下:
O00001 123 1234.0
O00002 124 3435.0
O00003 128 635.0
O00004 127 8976.0
O00005 135 900.0
*
*/
public class GroupComparatorMian {
static final Log LOG = LogFactory.getLog(GroupComparatorMian.class);
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GroupComparatorMian.class);
job.setMapperClass(GroupComparatorMapper.class);
job.setReducerClass(GroupComparatorReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(CustGroupComparator.class);
String jobName = "'Customize groupcomparator test'";
job.setJobName(jobName);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean bb = job.waitForCompletion(true);
if(bb) {
LOG.info("Job "+ jobName +" is done.");
}else {
LOG.info("Job "+ jobName +"is going wrong,now exit.");
System.exit(0);
}
}
}
class CustGroupComparator extends WritableComparator{
public CustGroupComparator() {
super(OrderBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa = (OrderBean)a;
OrderBean ob = (OrderBean)b;
return oa.getOrder_id().compareTo(ob.getOrder_id());
}
}
class OrderBean implements WritableComparable<OrderBean>{
private String order_id;
private String id ;
private double prise;
public OrderBean() {
}
public OrderBean(String order_id,String id,double prise) {
this.order_id = order_id ;
this.id = id;
this.prise = prise;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public double getPrise() {
return prise;
}
public void setPrise(double prise) {
this.prise = prise;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(id);
out.writeDouble(prise);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.id = in.readUTF();
this.prise = in.readDouble();
}
@Override
public int compareTo(OrderBean o) {
int cnt = this.order_id.compareTo(o.getOrder_id());
if(cnt==0) {
cnt = (int) (-(this.prise- o.getPrise()));
}
return cnt;
}
@Override
public String toString() {
return order_id + "\t" + id + "\t" + prise ;
}
}
class GroupComparatorMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
NullWritable nul = NullWritable.get();
OrderBean ob = new OrderBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
ob.setOrder_id(split[0]);
ob.setId(split[1]);
ob.setPrise(Double.parseDouble(split[2]));
context.write(ob, nul);
}
}
class GroupComparatorReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
NullWritable nul = NullWritable.get();
@Override
protected void reduce(OrderBean bean, Iterable<NullWritable> iter,
Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(bean, nul);
}
}
以上是“hadoop如何自定义GroupComparator实现求最大值”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!