版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、MapReduce实例浅析2014/12/19 · IT技术 · MapReduce分享到:11· Android面试解密-Layout_weight· Spring MVC拦截器· Python错误和异常· MongoDB 在线讲座之如何测试、调整及监控MongoDB性能原文出处: codingwu 欢迎分享原创到伯乐头条在文章MapReduce原理与设计思想中,详细剖析了MapReduce的原理,这篇文章则通过实例重点剖析MapReduce1.MapRedu
2、ce概述Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就
3、是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的
4、参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。虽然Hadoop框架是用Java实现的,但Map/Reduce应用程序则不一定要用 Java来写 。2.样例分析:单词计数1、WordCount源码分析单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到单词计数主要
5、完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:(1)Map过程Map过程需要继承org.apache.hadoop.mapreduce包中的Mapper类,并重写map方法通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中的value值存储的是文本文件中的一行(以回车符作为行结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交由MapReduce框架处理。其中IntWritable和Te
6、xt类是Hadoop对int和string类的封装,这些类能够被串行化,以方便在分布式环境中进行数据交换。TokenizerMapper的实现代码如下:1234567891011121314public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Te
7、xt word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException System.out.println("key = " + key.toString();/添加查看key值
8、60; System.out.println("value = " + value.toString();/添加查看value值 StringTokenizer itr = new StringTokenizer(value.toString(); while (itr.hasMoreTokens()
9、60; word.set(itr.nextToken(); context.write(word, one); (2)Reduce过程Reduce过程需要继承org.apache.had
10、oop.mapreduce包中的Reducer类,并重写reduce方法reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词的出现总次数IntSumReduce类的实现代码如下:123456789101112public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> private IntWritable result =
11、new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException int sum = 0; for (IntWritable val : values)
12、160; sum += val.get(); result.set(sum); context.write(key, result); (3)执行MapReduce任务在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相
13、关的设置。此处设置了使用TokenizerMapper完成Map过程和使用的IntSumReduce完成Combine和Reduce过程。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输入和输出路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务,主函数实现如下:12345678910111213141516171819public static void main(String
14、args) throws Exception Configuration conf = new Configuration(); String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) System.err.println("U
15、sage: wordcount <in> <out>"); System.exit(2); Job job = new Job(conf, "word count"); job.setJarByClass(wordCount.class); job.setMapperClass(Token
16、izerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Fi
17、leInputFormat.addInputPath(job, new Path(otherArgs0); FileOutputFormat.setOutputPath(job, new Path(otherArgs1); System.exit(job.waitForCompletion(true) ? 0 : 1); 运行结果如下:14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with proce
18、ssName=JobTracker, sessionId=14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 214/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_000114/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 214/12/17 05:53:26 INFO mapred.MapTask: io.sort.
19、mb = 10014/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680key = 0value = Hello Worldkey = 12value = Bye World14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output14/12/17 05:53:27 INFO mapred.MapTas
20、k: Finished spill 014/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_m_000000_0 done.14/12/17 05:53:27 INFO mapre
21、d.MapTask: io.sort.mb = 10014/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/32768014/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map outputkey = 0value = Hello Hadoopkey = 13value = Bye Hadoop14/12/17 05:53:
22、27 INFO mapred.MapTask: Finished spill 014/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_m_000001_0 done.14/12/1
23、7 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.T
24、askRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved ou
25、tput of task attempt_local_0001_r_000000_0 to out14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 done.14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100%14/12/17 05:53:27 INFO mapred.JobClient: Job
26、 complete: job_local_000114/12/17 05:53:27 INFO mapred.JobClient: Counters: 1414/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=1788614/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=5293214/12/17 05:53:27 INFO mapred.JobClient
27、: FILE_BYTES_WRITTEN=5423914/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=7143114/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=414/12/17 05:53:27 INFO mapred.JobClient: Combine output records=614/12/17 05:53:27 INF
28、O mapred.JobClient: Map input records=414/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=014/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=414/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=1214/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=7814/12/17 05:53:
29、27 INFO mapred.JobClient: Combine input records=814/12/17 05:53:27 INFO mapred.JobClient: Map output records=814/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=62、WordCount处理过程上面给出了WordCount的设计思路和源码,但是没有深入细节,下面对WordCount进行更加详细的分析:(1)将文件拆分成splits,由于测试用的文件较小,所以每一个文件为一个split,并将文件按行分割成<ke
30、y, value>对,如图,这一步由Mapreduce框架自动完成,其中偏移量包括了回车所占的字符(2)将分割好的<key, value>对交给用户定义的map方法进行处理,生成新的<key, value>对(3)得到map方法输出的<key, value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果,如图:(4)Reduce先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key, value>对,并作为
31、WordCount的输出结果,如图:3.MapReduce,你够了解吗?MapReduce框架在幕后默默地完成了很多的事情,如果不重写map和reduce方法,会出现什么情况呢?下面来实现一个简化的MapReduce,新建一个LazyMapReduce,该类只对任务进行必要的初始化及输入/输出路径的设置,其余的参数均保持默认代码如下:123456789101112131415public class LazyMapReduce public static void main(String args) throws Exception
32、 / TODO Auto-generated method stub Configuration conf = new Configuration(); String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); &
33、#160; if(otherArgs.length != 2) System.err.println("Usage:wordcount<in><out>"); System.exit(2);
34、; Job job = new Job(conf, "LazyMapReduce"); FileInputFormat.addInputPath(job, new Path(args0); FileOutputFormat.s
35、etOutputPath(job, new Path(args1); System.exit(job.waitForCompletion(true)? 0:1); 运行结果为:14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=14/12/17 23:04:14 INFO input.FileInputF
36、ormat: Total input paths to process : 214/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_000114/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 214/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 10014/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%14/
37、12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/9961472014/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/32768014/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 014/12/17 23:04:19 INFO mapred.TaskRunner: T
38、ask:attempt_local_0001_m_000000_0 is done. And is in the process of commiting14/12/17 23:04:19 INFO mapred.LocalJobRunner: 14/12/17 23:04:19 INFO mapred.TaskRunner: Task attempt_local_0001_m_000000_0 done.14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 10014/12/17 23:04:20 INFO mapred.MapTa
39、sk: data buffer = 79691776/9961472014/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/32768014/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 014/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_
40、0 is done. And is in the process of commiting14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_m_000001_0 done.14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments14/12/17
41、 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting14/12/17 23:04:20 INFO map
42、red.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task attempt_local_0001_r_000000_0 to out14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce14
43、/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 done.14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_000114/12/17 23:04:20 INFO mapred.JobClient: Counters: 1414/12/17 23:04:20 INFO mapred.JobClie
44、nt: FileSystemCounters14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=4604014/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=5147114/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=5280814/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=9813214/12/17 23:04:20 INFO m
45、apred.JobClient: Map-Reduce Framework14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=314/12/17 23:04:20 INFO mapred.JobClient: Combine output records=014/12/17 23:04:20 INFO mapred.JobClient: Map input records=414/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=014/12/17 23:0
46、4:20 INFO mapred.JobClient: Reduce output records=414/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=814/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=7814/12/17 23:04:20 INFO mapred.JobClient: Combine input records=014/12/17 23:04:20 INFO mapred.JobClient: Map output records=414/12/
47、17 23:04:20 INFO mapred.JobClient: Reduce input records=4可见在默认情况下,MapReduce原封不动地将输入<key, value>写到输出下面介绍MapReduce的部分参数及其默认设置:(1)InputFormat类该类的作用是将输入的数据分割成一个个的split,并将split进一步拆分成<key, value>对作为map函数的输入(2)Mapper类实现map函数,根据输入的<key, value>对生产中间结果(3)Combiner实现combine函数,合并中间结果中具有相同key值的键
48、值对。(4)Partitioner类实现getPartition函数,用于在Shuffle过程按照key值将中间数据分成R份,每一份由一个Reduce负责(5)Reducer类实现reduce函数,将中间结果合并,得到最终的结果(6)OutputFormat类该类负责输出最终的结果上面的代码可以改写为:123456789101112131415161718192021222324252627public class LazyMapReduce public static void main(String args) throws Exceptio
49、n / TODO Auto-generated method stub Configuration conf = new Configuration(); String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2)
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 光伏储能一体式充电基础设施项目可行性研究报告模板-立项备案
- 酒吧供货合同范本
- 开发商安装门窗的合同范本
- 2024年隔热、隔音人造矿物材料及其制品项目合作计划书
- 围挡装修合同范本
- 工程合同范本2017
- 莆田房屋买卖合同范本
- 财务决策流程设计计划
- 大厦建设工程施工合同范本
- 中环地产合同范本
- Specification-原材料规格书模板
- 实验室课外向学生开放计划
- 科技特派员工作调研报告
- 2021年电力公司创一流工作会议讲话
- 中波广播发送系统概述
- 县疾控中心中层干部竞聘上岗实施方案
- 急性心肌梗死精美PPt完整版
- 毕业设计(论文)基于三菱PLC的交通灯模拟控制
- (完整版)offer模板范本.docx
- 物业日常巡查记录表.doc
- 门技术参数[图文借鉴]
评论
0/150
提交评论