第4章 分布式计算框架MapReduce_第1页
第4章 分布式计算框架MapReduce_第2页
第4章 分布式计算框架MapReduce_第3页
第4章 分布式计算框架MapReduce_第4页
第4章 分布式计算框架MapReduce_第5页
已阅读5页,还剩59页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第4章分布式计算框架MapReduce4.1认识MapReduce嵌入式系统基本概念为了降低软件开发人员的编程难度,MapReduce框架隐藏了很多内部功能的实现细节,实现了自动并行处理。软件开发人员可以在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上,完成大数据集的计算。4.1.1分布式并行编程嵌入式系统基本概念单个计算机的CPU、内存和硬盘等资源是有限的,单个计算机在处理海量数据时,无法高效地完成大量运算。一种有效的解决方案是借助于分布式并行编程来提高程序性能。分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力。同时通过向集群中增加新的计算机,可以很容易地扩充集群的计算能力。MapReduce最早是由Google提出的一种面向大规模数据处理的分布式并行计算模型,HadoopMapReduce是它的开源实现。4.1.2MapReduce简介嵌入式系统基本概念用MapReduce来处理的数据集必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全独立地进行并行处理。MapReduce采用“分而治之”的核心思想,即将一个大数据集通过一定的数据划分方法,分成多个较小的数据集,各小的数据集之间不存在依赖关系,将这些小的数据集分配给不同的节点去处理,最后将处理的结果进行汇总,从而形成最终的结果。4.1.2MapReduce简介嵌入式系统基本概念

MapReduce设计的一个理念就是“计算向数据靠拢”,因为在大规模数据环境下,移动数据需要大量的网络传输开销。只要有可能,MapReduce就会将Map程序分配到数据所在的节点上运行,从而减少节点之间的数据传输开销。最简单的MapReduce应用程序至少包含3个部分:一个map函数、一个reduce函数和一个main函数。在运行一个MapReduce程序时,整个处理过程被分为Map阶段和Reduce阶段,每个阶段都是用键值对(key/value)作为输入和输出。main函数是MapReduce应用程序的入口,它将文件输入/输出和作业控制结合起来。4.1.3MapReduce的运行环境嵌入式系统基本概念

YARN(YetAnotherResourceNegotiator,另一种资源协调者)是Hadoop2.0中的资源管理和调度框架,YARN的目标就是实现“一个集群多个框架”,即在一个集群上部署一个统一的资源调度管理框架YARN,在YARN之上不但可以运行MapReduce,而且还可以运行Spark、Storm、Tez等计算框架。YARN的引入为集群在资源统一管理和数据共享等方面带来了很大方便。4.1.3MapReduce的运行环境嵌入式系统基本概念

YARN采用主从架构(Master/Slave),包括:ResourceManager、ApplicationMaster和NodeManager三个核心组件。ResourceManager运行在主节点,负责整个集群的资源管理和分配。每个应用程序拥有一个ApplicationMaster,ApplicationMaster管理一个在YARN内运行的应用程序实例,负责申请资源、任务调度和任务监控;NodeManager运行在从节点,整个集群有多个NodeManager,负责单节点资源的管理和使用。4.1.4Hadoop内置数据类型嵌入式系统基本概念1.ByteWritable:字节类型。2.IntWritable:整型类型。3.LongWritable:长整型类型。4.BooleanWritable:布尔类型。5.FloatWritable:单精度浮点数类型。6.DoubleWritable:双精度浮点数类型。7.Text:使用UTF8格式存储的文本类型。8.NullWritable:空对象,当<key,value>中的key或value为空时使用。4.1.4Hadoop内置数据类型嵌入式系统基本概念【例4-1】常见Hadoop数据类型的应用importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;4.1.4Hadoop内置数据类型嵌入式系统基本概念publicclassHadoopDataTypeTest{ /*使用hadoop的Text类型*/ publicstaticvoidtestText(){ System.out.println("testText:"); Texttext=newText("Helloworld!"); System.out.println(text.toString()); System.out.println(text.find("r")); System.out.println(text.getLength()); text.set("Hello,hadoop!"); System.out.println(text.toString()); }4.1.4Hadoop内置数据类型嵌入式系统基本概念/*使用hadoop的IntWritable类型*/ publicstaticvoidtestIntWritable(){ System.out.println("testIntWritable:"); IntWritableintWritable=newIntWritable(5); System.out.println("intWritable="+intWritable); inti=intWritable.get()+1; intWritable.set(i); System.out.println("intWritable="+intWritable); }4.1.4Hadoop内置数据类型嵌入式系统基本概念/*使用hadoop的DoubleWritable类型*/ publicstaticvoidtestDoubleWritable(){ System.out.println("testDoubleWritable:"); DoubleWritabledoubleWritable=newDoubleWritable(6.5); System.out.println("doubleWritable="+doubleWritable); doubled=doubleWritable.get()+1.3; doubleWritable.set(d); System.out.println("doubleWritable="+doubleWritable); }4.1.4Hadoop内置数据类型嵌入式系统基本概念publicstaticvoidmain(Stringargs[]){ testText(); testIntWritable(); testDoubleWritable(); }}4.1.4Hadoop内置数据类型嵌入式系统基本概念图4-1程序运行结果4.2.1MapReduce工作流程概述嵌入式系统基本概念图4-2MapReduce的工作流程4.2.2Shuffle过程分析嵌入式系统基本概念图4-3MapReduce的Shuffle过程4.3MapReduce入门示例:WordCount嵌入式系统基本概念本节以Hadoop自带的WordCount程序为例,分析WordCount的设计思路、介绍WordCount编程方法和运行步骤。4.3.1WordCount程序任务嵌入式系统基本概念

表4-1WordCount程序任务4.3.1WordCount程序任务嵌入式系统基本概念

表4-2WordCount的输入和输出实例4.3.2准备被统计的文件嵌入式系统基本概念创建f1.txt和f2.txt输入如下命令,创建test2目录,上传f1.txt和f2.txthadoopfs-mkdir/test2//在HDFS根目录下创建test2hadoopfs-putf1.txt/test2//上传f1.txt到HDFS的/test2下hadoopfs-putf2.txt/test2//上传f2.txt到HDFS的/test2下4.3.3WordCount的执行过程分析嵌入式系统基本概念图4-5拆分输入数据4.3.3WordCount的执行过程分析嵌入式系统基本概念图4-6执行用户定义的map()4.3.3WordCount的执行过程分析嵌入式系统基本概念图4-7使用Combine函数的Map端Shuffle过程4.3.3WordCount的执行过程分析嵌入式系统基本概念图4-8Reduce端的操作4.3.4WordCount编程实践嵌入式系统基本概念importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;4.3.4WordCount编程实践嵌入式系统基本概念importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;4.3.4WordCount编程实践嵌入式系统基本概念publicclassWordCount{/**自定义的TokenizerMapper类继承Mapper类。<Object,Text,Text,IntWritable>分别用来指定Map的输入key值类型、输入value值类型、输出key值类型、输出value值类型*/publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();4.3.4WordCount编程实践嵌入式系统基本概念/**key记录的数据的偏移量,value是每次split提供给程序读取的一行数据*/publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{/**StringTokenizer用于字符串分解,默认的分隔符是空格("")、制表符(\t)、换行符(\n)、回车符(\r)*/StringTokenizeritr=newStringTokenizer(value.toString());while(itr.hasMoreTokens()){word.set(itr.nextToken());context.write(word,one);//以<word,1>的形式输出

}}}4.3.4WordCount编程实践嵌入式系统基本概念

/**自定义的IntSumReducer类继承Reducer类。<Text,IntWritable,Text,IntWritable>分别用来指定Reduce的输入key值类型、输入value值类型、输出key值类型、输出value值类型*/publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();4.3.4WordCount编程实践嵌入式系统基本概念/**reduce将输入的key值作为输出的key值,将key值对应的list各元素值加起来,作为value值输出*/publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{intsum=0;4.3.4WordCount编程实践嵌入式系统基本概念for(IntWritableval:values){sum+=val.get();//将相同的单词对应的值加一起

}result.set(sum);context.write(key,result);}}4.3.4WordCount编程实践嵌入式系统基本概念/**main()的输入参数决定了输入数据的文件位置,以及输出数据存储的位置;创建一个Configuration对象时,会获取Hadoop的配置信息;通过Job的对象可设置Hadoop程序运行时的环境变量*/publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();4.3.4WordCount编程实践嵌入式系统基本概念if(otherArgs.length<2){System.err.println("Usage:wordcount<in>[<in>...]<out>");System.exit(2);}Jobjob=Job.getInstance(conf,"wordcount");//实例化jobjob.setJarByClass(WordCount.class);//设置整个程序的类名

job.setMapperClass(TokenizerMapper.class);//为job设置Map类

job.setCombinerClass(IntSumReducer.class);//为job设置Combine类4.3.4WordCount编程实践嵌入式系统基本概念job.setReducerClass(IntSumReducer.class);//为job设置Reduce类

job.setOutputKeyClass(Text.class);//为job的输出数据设置Key类job.setOutputValueClass(IntWritable.class);//为job输出设置value类/**为job设置输入路径*/for(inti=0;i<otherArgs.length-1;++i){FileInputFormat.addInputPath(job,newPath(otherArgs[i]));}4.3.4WordCount编程实践嵌入式系统基本概念

/**为job设置输出路径*/FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1]));System.exit(job.waitForCompletion(true)?0:1);}}4.4.1求平均值嵌入式系统基本概念求平均值是数据分析中的常用操作,下面以求学生的平均成绩为例,介绍使用MapReduce求平均值的方法。【例4-2】现有某班学生English、Math、Chinese三门课程的成绩,每门课程的成绩记录在一个文本文件中,请编写MapReduce程序求每位学生的平均成绩。4.4.1求平均值嵌入式系统基本概念importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;4.4.1求平均值嵌入式系统基本概念importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;4.4.1求平均值嵌入式系统基本概念publicclassWordCount{

publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{/**去掉finalstatic,将one修改为实例变量*/

privateIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{4.4.1求平均值嵌入式系统基本概念

/**将输入的数据首先按行进行分割*/

StringTokenizeritr=newStringTokenizer(value.toString(),"\n");/**分别对每行进行处理*/while(itr.hasMoreTokens()){/**将每行按默认分隔符进行分割*/

StringTokenizerlie=newStringTokenizer(itr.nextToken());/**获取学生姓名*/

StringstrName=lie.nextToken();4.4.1求平均值嵌入式系统基本概念

/**获取学生成绩*/

StringstrScore=lie.nextToken();

word.set(strName);one.set(Integer.parseInt(strScore));/**输出姓名和成绩*/context.write(word,one);}}}4.4.1求平均值嵌入式系统基本概念publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();

publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{4.4.1求平均值嵌入式系统基本概念intsum=0;

intcount=0;for(IntWritableval:values){/**计算总分*/sum+=val.get();/**统计科目数*/

count++;}4.4.1求平均值嵌入式系统基本概念

/**计算平均值,并给result赋值*/

result.set((int)(sum/count));context.write(key,result);}}4.4.1求平均值嵌入式系统基本概念publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<2){System.err.println("Usage:wordcount<in>[<in>...]<out>");System.exit(2);}4.4.1求平均值嵌入式系统基本概念jobjob=Job.getInstance(conf,"wordcount");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);4.4.1求平均值嵌入式系统基本概念

for(inti=0;i<otherArgs.length-1;++i){FileInputFormat.addInputPath(job,newPath(otherArgs[i]));}FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1]));System.exit(job.waitForCompletion(true)?0:1);}}4.4.2简单查询功能的实现嵌入式系统基本概念在下面的例子中,我们借助MapReduce模拟SQL语句,实现简单的查询功能,能够更好帮助读者理解第6章将要学习的Hive。【例4-3】employees.txt中保存某公司的员工信息,每位员工的信息占一行,每行数据包含员工编号(id)、姓名(name)、年龄(age)、月薪(salary)和部门编号(depts)五项信息。如果把employees.txt看作一张表,表名为employees。请完成查询功能:“select*fromemployeeswhereage<30”,即查询年龄少于30岁的员工信息。4.4.2简单查询功能的实现嵌入式系统基本概念//

Employees.javaimportjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.Writable;4.4.2简单查询功能的实现嵌入式系统基本概念publicclassEmployeesimplementsWritable{privateintid;//员工编号

privateStringname;//姓名

privateintage;//年龄

privatedoublesalary;//月薪

privateintdepts;//部门编号4.4.2简单查询功能的实现嵌入式系统基本概念//实现反序列化@OverridepublicvoidreadFields(DataInputin)throwsIOException{ id=in.readInt(); name=in.readUTF(); age=in.readInt(); salary=in.readDouble(); depts=in.readInt(); }4.4.2简单查询功能的实现嵌入式系统基本概念

//实现序列化

@Override publicvoidwrite(DataOutputout)throwsIOException{ out.writeInt(id); out.writeUTF(name); out.writeInt(age); out.writeDouble(salary); out.writeInt(depts); }4.4.2简单查询功能的实现嵌入式系统基本概念//各种getter和setter publicintgetId(){ returnid; } publicvoidsetId(intid){ this.id=id; } publicStringgetName(){ returnname; }4.4.2简单查询功能的实现嵌入式系统基本概念

publicvoidsetName(Stringname){ =name; } publicintgetAge(){ returnage; } publicvoidsetAge(intage){ this.age=age; } publicdoublegetSalary(){ returnsalary; }4.4.2简单查询功能的实现嵌入式系统基本概念publicvoidsetSalary(doublesalary){ this.salary=salary; } publicintgetDepts(){ returndepts; } publicvoidsetDepts(intdepts){ this.depts=depts; }}4.4.2简单查询功能的实现嵌入式系统基本概念//

WordCount.javaimportjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;4.4.2简单查询功能的实现嵌入式系统基本概念importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;4.4.2简单查询功能的实现嵌入式系统基本概念publicclassWordCount{publicstaticclassTokenizerMapperextendsMapper<Object,Text,IntWritable,Employees>{/**创建一个IntWritable对象,作为输出的key*/privateIntWritableid=newIntWritable();/**创建一个Employees对象,作为输出的value*/privateEmployeesemployees=newEmployees();4.4.2简单查询功能的实现嵌入式系统基本概念publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{/**将文本分割成行*/StringTokenizeritr=newStringTokenizer(value.toString(),"\n");while(itr.hasMoreTokens()){/**将每行以逗号为分隔符进行分割,分割成各字段*/ StringTokenizerlie=newStringTokenizer(itr.nextToken(),",");/**将分割后的各字段值保存到employees对象中*/ employees.setId(Integer.valueOf(lie.nextToken()));//转换成整数4.4.2简单查询功能的实现嵌入式系统基本概念

employees.setName(lie.nextToken()); employees.setAge(Integer.parseInt(lie.nextToken())); employees.setSalary(Double.valueOf(lie.nextToken()));//转换成浮点数

employees.setDepts(Integer.parseInt(lie.nextToken())); if(employees.getAge()<30){ id.set(employees.getId()); context.write(id,employees); }}}}4.4.2简单查询功能的实现嵌入式系统基本概念publicstaticclassIntSumReducerextendsReducer<IntWritable,Employees,IntWr

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论