这篇文章主要介绍“Hadoop MapReduce是什么”,在日常操作中,相信很多人在Hadoop MapReduce是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop MapReduce是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
一、MapReduce是什么?
1. mapreduce的定义
2.mapreduce的核心思想
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
二、MapReduce编程
1.MapReduce编程模型
1. Map阶段
map阶段有一个关键的map()函数;
此函数的输入是键值对
输出是一系列键值对,输出写入本地磁盘。
2. Reduce阶段
Map&Reduce

2.Mapreduce编程指导思想(八个步骤)
1. Map阶段2个步骤
2. shuffle阶段4个步骤
第三步:对输出的key,value对进行分区。(相同key的数据属于同一分区)
第四步:对不同分区的数据按照相同的key进行排序
第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
第六步:对排序后的数据进行分组,分组的过程中,将相同key的value放到一个集合当中(每组数据调用一次reduce方法)
3. reduce阶段2个步骤
3.Hadoop当中常用的数据类型
Java类型 | Hadoop Writable类型 |
---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
byte[] | BytesWritable |
4.MapReduce编程入门之单词统计

5.mapreduce编程入门案例之单词计数统计实现
hello,hello
world,world
hadoop,hadoop
hello,world
hello,flume
hadoop,hive
hive,kafka
flume,storm
hive,oozie
第一步:创建maven工程并导入以下jar包
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh6.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh6.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh6.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh6.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第二步:定义mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 自定义mapper类需要继承Mapper,有四个泛型,
* keyin: k1 行偏移量 Long
* valuein: v1 一行文本内容 String
* keyout: k2 每一个单词 String
* valueout : v2 1 int
* 在hadoop当中没有沿用Java的一些基本类型,使用自己封装了一套基本类型
* long ==>LongWritable
* String ==> Text
* int ==> IntWritable
*
*/
public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/**
* 继承mapper之后,覆写map方法,每次读取一行数据,都会来调用一下map方法
* @param key:对应k1
* @param value:对应v1
* @param context 上下文对象。承上启下,承接上面步骤发过来的数据,通过context将数据发送到下面的步骤里面去
* @throws IOException
* @throws InterruptedException
* k1 v1
* 0;hello,world
*
* k2 v2
* hello 1
* world 1
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取我们的一行数据
String line = value.toString();
String[] split = line.split(",");
Text text = new Text();
IntWritable intWritable = new IntWritable(1);
for (String word : split) {
//将每个单词出现都记做1次
//key2 Text类型
//v2 IntWritable类型
text.set(word);
//将我们的key2 v2写出去到下游
context.write(text,intWritable);
}
}
}
第三步:定义reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
//第三步:分区 相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合
/**
* 继承Reducer类之后,覆写reduce方法
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int result = 0;
for (IntWritable value : values) {
//将我们的结果进行累加
result += value.get();
}
//继续输出我们的数据
IntWritable intWritable = new IntWritable(result);
//将我们的数据输出
context.write(key,intWritable);
}
}
第四步:组装main程序
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
这个类作为mr程序的入口类,这里面写main方法
*/
public class WordCount extends Configured implements Tool{
/**
* 实现Tool接口之后,需要实现一个run方法,
* 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception {
//获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "mrdemo1");
//实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包
//如果要打包到集群上面去运行,必须添加以下设置
job.setJarByClass(WordCount.class);
//第一步:读取文件,解析成key,value对,k1:行偏移量 v1:一行文本内容
job.setInputFormatClass(TextInputFormat.class);
//指定我们去哪一个路径读取文件
TextInputFormat.addInputPath(job,new Path("文件位置"));
//第二步:自定义map逻辑,接受k1 v1 转换成为新的k2 v2输出
job.setMapperClass(MyMapper.class);
//设置map阶段输出的key,value的类型,其实就是k2 v2的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//第三步到六步:分区,排序,规约,分组都省略
//第七步:自定义reduce逻辑
job.setReducerClass(MyReducer.class);
//设置key3 value3的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第八步:输出k3 v3 进行保存
job.setOutputFormatClass(TextOutputFormat.class);
//一定要注意,输出路径是需要不存在的,如果存在就报错
TextOutputFormat.setOutputPath(job,new Path("输出文件位置"));
//提交job任务
boolean b = job.waitForCompletion(true);
return b?0:1;
/***
* 第一步:读取文件,解析成key,value对,k1 v1
* 第二步:自定义map逻辑,接受k1 v1 转换成为新的k2 v2输出
* 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合
* 第四步:排序 对key2进行排序。字典顺序排序
* 第五步:规约 combiner过程 调优步骤 可选
* 第六步:分组
* 第七步:自定义reduce逻辑接受k2 v2 转换成为新的k3 v3输出
* 第八步:输出k3 v3 进行保存
*
*
*/
}
/*
作为程序的入口类
*/
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set("hello","world");
//提交run方法之后,得到一个程序的退出状态码
int run = ToolRunner.run(configuration, new WordCount(), args);
//根据我们 程序的退出状态码,退出整个进程
System.exit(run);
}
}
到此,关于“Hadoop MapReduce是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!