什么是mapreduce_mapreduce工作原理_mapreduce_mapreduce逻辑模型图

Mapreduce概况

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

1. MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。

2. MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。

MapReduce执行流程

 什么是mapreduce_mapreduce工作原理_mapreduce_mapreduce逻辑模型图

MapReduce原理

 什么是mapreduce_mapreduce工作原理_mapreduce_mapreduce逻辑模型图

MapReduce的执行步骤:

1、Map任务处理

1.1 读取HDFS中的文件。每一行解析成一个《k,v》。每一个键值对调用一次map函数。 《0,hello you》 《10,hello me》

1.2 覆盖map(),接收1.1产生的《k,v》,进行处理,转换为新的《k,v》输出。          《hello,1》 《you,1》 《hello,1》 《me,1》

1.3 对1.2输出的《k,v》进行分区。默认分为一个区。详见《ParTITIoner》

1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。 排序后:《hello,1》 《hello,1》 《me,1》 《you,1》 分组后:《hello,{1,1}》《me,{1}》《you,{1}》

1.5 (可选)对分组后的数据进行归约。详见《Combiner》

2、Reduce任务处理

2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle)详见《shuffle过程分析》

2.2 对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑, 《hello,2》 《me,1》 《you,1》

处理后,产生新的《k,v》输出。

2.3 对reduce输出的《k,v》写到HDFS中。

Java代码实现

注:要导入org.apache.hadoop.fs.FileUTIl.java。

1、先创建一个hello文件,上传到HDFS中

Java代码实现

注:要导入org.apache.hadoop.fs.FileUTIl.java。

1、先创建一个hello文件,上传到HDFS中

图三

2、然后再编写代码,实现文件中的单词个数统计(代码中被注释掉的代码,是可以省略的,不省略也行)

1package mapreduce;
       
        2

3 import java.net.URI;

4 import org.apache.hadoop.conf.Configuration;

5 import org.apache.hadoop.fs.FileSystem;

6 import org.apache.hadoop.fs.Path;

7 import org.apache.hadoop.io.LongWritable;

8 import org.apache.hadoop.io.Text;

9 import org.apache.hadoop.mapreduce.Job;

10 import org.apache.hadoop.mapreduce.Mapper;

11 import org.apache.hadoop.mapreduce.Reducer;

12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

16

17 public class WordCountApp {

18 static final String INPUT_PATH = “hdfs://chaoren:9000/hello”;

19 static final String OUT_PATH = “hdfs://chaoren:9000/out”;

20

21 public static void main(String[] args) throws Exception {

22 Configuration conf = new Configuration();

23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);

24 Path outPath = new Path(OUT_PATH);

25 if (fileSystem.exists(outPath)) {

26 fileSystem.delete(outPath, true);

27 }

28

29 Job job = new Job(conf, WordCountApp.class.getSimpleName());

30

31 // 1.1指定读取的文件位于哪里

32 FileInputFormat.setInputPaths(job, INPUT_PATH);

33 // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对

34 //job.setInputFormatClass(TextInputFormat.class);

35

36 // 1.2指定自定义的map类

37 job.setMapperClass(MyMapper.class);

38 // map输出的《k,v》类型。如果《k3,v3》的类型与《k2,v2》类型一致,则可以省略

39 //job.setOutputKeyClass(Text.class);

40 //job.setOutputValueClass(LongWritable.class);

41

42 // 1.3分区

43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);

44 // 有一个reduce任务运行

45 //job.setNumReduceTasks(1);

46

47 // 1.4排序、分组

48

49 // 1.5归约

50

51 // 2.2指定自定义reduce类

52 job.setReducerClass(MyReducer.class);

53 // 指定reduce的输出类型

54 job.setOutputKeyClass(Text.class);

55 job.setOutputValueClass(LongWritable.class);

56

57 // 2.3指定写出到哪里

58 FileOutputFormat.setOutputPath(job, outPath);

59 // 指定输出文件的格式化类

60 //job.setOutputFormatClass(TextOutputFormat.class);

61

62 // 把job提交给jobtracker运行

63 job.waitForCompletion(true);

64 }

65

66 /**

67 *

68 * KEYIN 即K1 表示行的偏移量

69 * VALUEIN 即V1 表示行文本内容

70 * KEYOUT 即K2 表示行中出现的单词

71 * VALUEOUT 即V2 表示行中出现的单词的次数,固定值1

72 *

73 */

74 static class MyMapper extends

75 Mapper《LongWritable, Text, Text, LongWritable》 {

76 protected void map(LongWritable k1, Text v1, Context context)

77 throws java.io.IOException, InterruptedException {

78 String[] splited = v1.toString().split(“\t”);

79 for (String word : splited) {

80 context.write(new Text(word), new LongWritable(1));

81 }

82 };

83 }

84

85 /**

86 * KEYIN 即K2 表示行中出现的单词

87 * VALUEIN 即V2 表示出现的单词的次数

88 * KEYOUT 即K3 表示行中出现的不同单词

89 * VALUEOUT 即V3 表示行中出现的不同单词的总次数

90 */

91 static class MyReducer extends

92 Reducer《Text, LongWritable, Text, LongWritable》 {

93 protected void reduce(Text k2, java.lang.Iterable《LongWritable》 v2s,

94 Context ctx) throws java.io.IOException,

95 InterruptedException {

96 long times = 0L;

97 for (LongWritable count : v2s) {

98 times += count.get();

99 }

100 ctx.write(k2, new LongWritable(times));

101 };

102 }

103 }

3、运行成功后,可以在Linux中查看操作的结果

图四

MapReduce主要功能

1)数据划分和计算任务调度:

系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并 负责Map节点执行的同步控制。

2)数据/代码互定位:

为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。

3)系统优化:

为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个 Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

4)出错检测和恢复:

以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。

 

技术专区

  • mybatis动态sql详解
  • 用VHDL语言设计数据传输系统中的HDB3编码器
  • 裸机程序如何驱动硬件?看前辈是怎么说的
  • 应用面向对象编程SoC原则的典型示例
  • 嵌入式开发之java常用开发工具介绍
  • 什么是mapreduce_mapreduce工作原理_mapreduce_mapreduce逻辑模型图已关闭评论
    A+
发布日期:2019年07月14日  所属分类:物联网