版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
《大数据平台部署与运维》单元5MapReduce实现电商销售数据统计任务5.1认识MapReduce01理解MapReduce的原理与体系架构02掌握YARN运行机制学习目标任务5.1认识MapReduce【任务场景】经理:小张,我们后台数据量越来越大,服务器计算性能不佳,计算效率是个问题,你有什么好的建议吗?小张:Hadoop的核心组件MapReduce可以用来做大规模数据集的并行运算,我们已经有了Hadoop集群,不妨试试MapReduce。经理:嗯,没错,MapReduce可以并行拆分和处理TB级数据,运行在普通服务器组成的集群上也能保证快速高效地处理海量数据,你先了解一下MapReduce的原理和运行机制吧。小张:好的。任务5.1认识MapReduce【任务布置】MapReduce的运行依赖于JDK和Hadoop,因此必须将Hadoop的基础环境提前安装好,才能进行MapReduce的运行和操作。本任务要求在前面已经完成安装部署Hadoop平台的node1节点上完成,要求理解MapReduce的原理和体系架构;理解YARN运行机制;最终在node1上运行MapReduce自带的单词计数程序,查看运行结果。任务5.1认识MapReduce5.1.1MapReduce介绍MapReduce是一种分布式计算模型,由Google提出,起初主要用于搜索领域,解决海量数据的计算问题。MapReduce是Hadoop框架内的一种编程模型,用于访问存储在Hadoop文件系统(HDFS)中的大数据,它是一个核心组件,是Hadoop框架功能不可或缺的一部分。MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:(1)MapReduce是一个基于集群的高性能并行计算平台(ClusterInfrastructure)。(2)MapReduce是一个并行计算与运行软件框架(SoftwareFramework)。(3)MapReduce是一个并行程序设计模型与方法(ProgrammingModel&Methodology)。任务5.1认识MapReduceMapReduce原理(1)基本概念用一个比较形象的例子解释MapReduce:我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。MapReduce由两个阶段组成:Map(映射)和Reduce(归纳),用户只需要实现map()和reduce()两个函数,即可实现分布式计算。这两个函数的形参是key、value对,表示函数的输入信息。任务5.1认识MapReduceMapReduce原理(2)映射和归纳简单说来,一个映射函数就是对一些独立元素组成的概念上的列表的每一个元素进行指定的操作。事实上,每个元素都是被独立操作的,而原始列表没有被更改,因为这里创建了一个新的列表来保存新的答案。这就是说,Map操作是可以高度并行的,这对高性能要求的应用以及并行计算领域的需求非常有用。而归纳操作指的是对一个列表的元素进行适当的合并。虽然它不如映射函数那么并行,但是因为化简总是有一个简单的答案,大规模的运算相对独立,所以化简函数在高度并行环境下也很有用。任务5.1认识MapReduceMapReduce优势(1)易于理解MapReduce通过抽象模型和计算框架把需要做什么和具体做什么分开了,为开发者提供了一个抽象和高层的编程接口和框架,开发者仅需关心其他应用层的具体计算问题,大大降级了开发者使用时的心智负担。
(2)良好的扩展性基于MapReduce的计算性能可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。
(3)分布可靠MapReduce通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性地返回它所完成的工作和最新状态。如果一个节点保持沉默超过一个预设的时间间隔,主节点记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。
任务5.1认识MapReduce5.1.2
MapReduce体系架构MapReduce1.0
采用了Manager/Worker(M/W)架构。它主要由以下几个组件组成:Client、JobTracker、TaskTracker和Task。任务5.1认识MapReduceMapReduce1.0体系架构介绍(1)Client用户编写的MapReduce程序通过Client提交到JobTracker端
;同时,用户可通过Client提供的一些接口查看作业运行状态。(2)JobTrackerJobTracker主要负责资源监控和作业调度。(3)TaskTracker
TaskTracker会周期性地通过Heartbeat(心跳检测)将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。(4)TaskTask分为MapTask和ReduceTask两种,均由TaskTracker启动。任务5.1认识MapReduceMapReduce2.0体系架构Hadoop2.0新引入的资源管理系统,是直接从MapReducev1演化而来的;核心思想:将MapReducev1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和进程实现。任务5.1认识MapReduceMapReduce2.0体系架构介绍(1)Client与MapReducev1的Client类似,用户通过Client与YARN交互,提交MapReduce作业,查询作业运行状态,管理作业等。(2)ResourceManagerResourceManager是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器和应用程序管理器。(3)NodeManagerNodeManager是每个节点上的资源和任务管理器,一方面,它会定时地向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自ApplicationMaster的Container启动/停止等各种请求。任务5.1认识MapReduceMapReduce2.0体系架构介绍(4)ApplicationMaster
ApplicationMaster功能类似于1.0中的JobTracker,但不负责资源管理。功能包括:任务划分、任务调度、任务状态监控和容错。(5)ContainerContainer是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当ApplicationMaster向ResourceManager申请资源时,ResourceManager为ApplicationMaster返回的资源便是用Container表示的。(6)MapTask/ReduceTaskMapTask/ReduceTask周期性向ApplicationMaster汇报心跳。一旦Task挂掉,则ApplicationMaster将为之重新申请资源,并重新运行。任务5.1认识MapReduceMapReduce任务执行过程(1)Map任务执行过程MapTask执行过程如下图所示。由该图可知,MapTask先将对应的split迭代解析成一个个key/value对,依次调用用户自定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition(分片),每个partition将被一个ReduceTask处理。任务5.1认识MapReduceMapReduce任务执行过程(2)Reduce任务执行过程ReduceTask执行过程如下图所示。该过程分为三个阶段:①从远程节点上读取MapTask中间结果(称为“Shuffle阶段”);②按照key对key/value对进行排序(称为“Sort阶段”);③依次读取<key,valuelist>,调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上(称为“Reduce阶段”)。任务5.1认识MapReduce5.1.3MapReduce发展现状以前,在Google,MapReduce用在非常广泛的应用程序中,MapReduce实现以后,它被用来重新生成Google的整个索引,并取代老的adhoc程序去更新索引。Hadoop解决了有无问题。很快人们发现MapReduce复杂度很高,即使技术实力强大如Facebook都很难写出高效正确的MapReduce程序。此外除了解决批处理问题,人们需要Hadoop能解决其遇到的交互式查询任务。为此,Facebook开发了Hive,该项目快速流行起来,到现在还有很多用户。Facebook当时更是高达95%的用户使用Hive而不是裸写MapReduce程序。但是MapReduce的思想和技术原理还是值得我们学习。马里兰大学教授、2010年出版的《Data-IntensiveTextProcessingwithMapReduce》一书的作者JimmyLin在书中提出:MapReduce改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到所见到的最为成功的基于大规模计算资源的计算模型。任务5.1认识MapReduce5.1.4YARN运行机制Hadoop主要组件有HDFS和YARN,HDFS是分布式文件系统,主要是进行文件的存储,而YARN是Hadoop集群资源管理系统,支持分布式计算模式。YARN上运行一个应用的步骤:(1)客户端联系资源管理器,要求运行一个applicationmaster进程。(2)资源管理其找到一个能够在容器启动applicationmaster的节点管理器。(3)applicationmaster运行后,根据应用本身向资源管理器请求更多容器。(4)资源管理器给applicationmaster分配需要的资源后,applicationmaster在对应资源节点管理器启动容器,节点管理器获取任务运行需要的resources后,在该容器运行任务。任务5.1认识MapReduceYARN调度理想情况下YARN应用发出资源请求会立刻给予满足,但现实是资源是有限的,通常需要等待才能得到所需资源,因此YARN提供多种调度器和可配置策略来供选择。(1)FIFO调度器FIFO调度器将应用放置在一个队列中,按照提交顺序运行应用,首先为队列中第一个应用请求分配资源,第一个应用请求被满足后再依次为队列下一个应用服务。(2)容器调度器(CapacityScheduler)
使用容量调度器时,一个独立的专门队列保证小作业一提交就启动,由于队列容量是为固定队列中作业所保留的,这种策略是以整个集群利用率为代价,相比于FIFO调度器相比,大作业执行时间更长。(3)公平调度器(FairScheduler)
使用公平调度器时,不需要预留一定资源,调度器会在所有运行的作业之间动态平衡资源。任务5.1认识MapReduce【工作流程】1.新建本地文件,编辑文件内容。2.上传文件到hdfs。3.运行自带jar包,查看运行结果。任务5.1认识MapReduce【操作步骤】1.
在本地/home/hdfs目录下新建文件hello。文件中输入如下内容2.
将hello文件上传至HDFS根目录下:3.运行jar包。在Hadoop的安装目录下有一个示例jar包,地址为:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar,里面有很多框架自带的例子。使用以下命令查看jar包中包含的例子:使用以下命令运行自带的wordcount程序:[root@master01/]#mkdir/home/hdfs[root@master01/]#cd/home/hdfs[root@master01/]#vimhelloHelloHDFSHelloMapReduce[root@master01/]#hdfsdfs-put/home/hdfs/hello/[root@master01/]#hadoopjarhadoop-mapreduce-examples-3.1.1.jar[root@master01/]#hadoopjarhadoop-mapreduce-examples-3.1.1.jarwordcount/hello/out任务5.1认识MapReduce【操作步骤】4.常见问题初次运行hadoopjar命令时可能会出现找不到或无法加载主类的错误。需要到yarn-site.xml配置文件修改配置,添加以下内容。<property><name>yarn.application.classpath</name><value>/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*</value></property>任务总结理解MapReduce的原理与体系架构。掌握YARN运行机制。北京汇智科教《大数据平台部署与运维》单元5MapReduce实现电商销售数据统计任务5.2MapReduce实现词频统计01理解MapReduce数据处理流程02掌握MapReduce相关Java
API学习目标03掌握MapReduce驱动类的编写任务5.2MapReduce实现词频统计【任务场景】经理:小张,我们后续也要使用Java编写MapReduce程序,你对MapReduce的JavaAPI了解吗?小张:Hadoop框架底层就是用Java实现的,所以Hadoop提供了许多用来调用MapReduce的JavaAPI,我们可以借助MapReduce的JavaAPI可以完成很多项自定义操作。经理:是的,使用MapReduce的JavaAPI时也要注意数据类型转换,要使用Hadoop特有的Writable数据类型,你先熟悉一下MapReduce的JavaAPI,然后自己实现一个词频统计程序,在服务器上运行一下试试吧。小张:好的。任务5.2MapReduce实现词频统计【任务布置】MapReduce的运行依赖与JDK和Hadoop,因此必须将Hadoop的基础环境提前安装好,才能进行MapReduce的运行和操作。本任务要求在前面已经完成安装部署Hadoop平台的node1节点上完成,要求理解MapReduce数据处理流程,掌握MapReduce相关JavaAPI及驱动类的创建;基于IDEA进行开发,最终在node1上运行打包后的程序,实现词频统计。任务5.2MapReduce实现词频统计5.2.1MapReduce数据处理流程MapReduce处理数据过程主要分成Map和Reduce两个阶段。数据处理流程如下图所示:任务5.2MapReduce实现词频统计Map任务处理(共五步)(1)读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。(2)写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。(3)对输出的key、value进行分区。(4)对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。(5)(可选)分组后的数据进行归约。任务5.2MapReduce实现词频统计Reduce任务处理(共三步)(1)对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。(2)对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。(3)把reduce的输出保存到文件中。任务5.2MapReduce实现词频统计5.2.2MapReduce相关Java
API及应用1.
MapReduce中数据类型介绍MapReduce中所有的数据类型都要实现Writable接口,以便于这些类型定义的数据可以被序列化进行网络传输和文件存储。Hadoop数据类型都放在org.apache.hadoop.io包下。Hadoop数据类型和Java数据类型对照表数据类型Hadoop数据类型Java数据类型布尔型BooleanWritableboolean整型ByteWritablebyteShortWritableshortIntWritableintLongWritablelong浮点型FloatWritablefloatDoubleWritabledouble字符串(文本)Textstring数组ArrayWritableArraymap集合MapWritablemap任务5.2MapReduce实现词频统计2.
MapReduce
Mapper类在MapReduce中,Mapper类的作用是将输入的键值对映射到一组中间键值对。它将输入记录转换为中间记录。这些中间记录与给定的输出键相关联,并传递给Reducer以获得最终输出。Mapper类的常用方法如下表所示:方法描述voidcleanup(Contextcontext)此方法仅在任务结束时调用一次。voidmap(KEYINkey,VALUEINvalue,Contextcontext) 对于输入Mapper中的每个键值,只能调用此方法一次。voidrun(Contextcontext)可以重写此方法以控制Mapper的执行。voidsetup(Contextcontext)此方法仅在任务开始时调用一次。任务5.2MapReduce实现词频统计3.
MapReduce
Reducer类在MapReduce中,Reducer类的作用是减少中间值的集合。它的实现可以通过JobContext.getConfiguration()方法访问作业的配置。Reducer类的常用方法如下表所示:方法描述voidcleanup(Contextcontext)此方法仅在任务结束时调用一次。voidreduce(KEYINkey,Iterable<VALUEIN>values,Contextcontext) 此方法只为每个键调用一次。voidrun(Contextcontext)这个方法可以用来控制Reducer的任务。voidsetup(Contextcontext)此方法仅在任务开始时调用一次。任务5.2MapReduce实现词频统计4.
MapReduce
Job类Job类用于配置作业和提交作业,它还控制执行和查询状态。Job类的常用方法如下表所示:方法描述voidsetJarByClass(Class<>class)核心接口,指定执行类所在的jar包本地位置。java通过class文件找到执行jar包,该jar包被上传到HDFS。voidsetMapOutputKeyClass(Class<>class)该方法用于设置map输出数据的key类型。voidsetMapOutputValueClass(Class<>class)该方法用于设置map输出数据的value类型。setOutputKeyClass(Class<>theClass)核心接口,指定MapReduce作业的输出key的类型setOutputValueClass(Class<>theClass)核心接口,指定MapReduce作业的输出value的类型voidsetMapperClass(Class<extendsMapper>class)核心接口,指定MapReduce作业的Mapper类,默认为空。voidsetNumReduceTasks(inttasks)该方法用于设置job的reduce任务数voidsetReducerClass(Class<extendsReducer>class)核心接口,指定MapReduce作业的Reducer类,默认为空。setPartitionerClass(Class<extendsPartitioner>class)指定MapReduce作业的Partitioner类。该方法用来分配map的输出结果到哪个reduce类,默认使用HashPartitioner,均匀分配map的每条键值对记录。任务5.2MapReduce实现词频统计5.2.3MapReduce驱动类驱动类主要用于关联Mapper和Reducer以及提交整个程序。驱动类的开发有着固定的格式。编写驱动类总分七大步骤:1.获取job对象。新建一个配置对象实例,用于获取job实例方法传递参数。2.设置Jar的路径(就是当前驱动类)。设置当前驱动类路径,参数即为当前驱动类的类名。Configurationconfiguration=newConfiguration();Jobjob=Job.getInstance(configuration);job.setJarByClass(WordCountDriver.class);任务5.2MapReduce实现词频统计3.关联mapper和reducer。设置mapper和reducer类,参数为自定义的mapper类类名及reducer类的类名。4.设置mapper输出的key和value类型。设置mapper输出的key和value类型时可根据业务灵活调整,类型需使用已实现Writable的数据类型。5.设置最终输出的key和value设置最终输出的key和value类型时可根据业务灵活调整,需使用已实现Writable的Hadoop数据类型。job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
任务5.2MapReduce实现词频统计6.设置输入输出路径(要读取的数据输入路径和输出数据的路径)设置路径有两种方式,一种是通过传入参数动态读取,另一种是在程序中写成固定路径,但是会降低扩展性,推荐使用第一种方式。第一种方式在运行时必须传入输入输出路径,示例:hadoopjarWordCountApp/hello/out需要注意这里的路径都是HDFS中的路径,也就是读取和输出的文件都是在HDFS中的。7.提交job将作业提交到群集并等待它完成,参数设置为true代表打印显示对应的进度,根据作业结果,终止当前运行的Java虚拟机,退出程序。FileInputFormat.setInputPaths(job,newPath(args[1]));FileOutputFormat.setOutputPath(job,newPath(args[2]));//或FileInputFormat.setInputPaths(job,newPath("/map/input"));FileOutputFormat.setOutputPath(job,newPath("/map/output"));
Booleanresult=job.waitForCompletion(true);System.exit(result?0:-1);
任务5.2MapReduce实现词频统计【工作流程】创建新的项目,编写Mapper类,继承父类并重写map方法,编写Reducer类,继承父类并重写reduce方法,编写驱动类,程序导出为jar包运行,查看结果。任务5.2MapReduce实现词频统计【操作步骤】创建新的项目,项目类型选择Maven,项目名称为MRWordCount,pom依赖为以下内容。<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency></dependencies>
任务5.2MapReduce实现词频统计1.编写Mapper类创建Java类MyMapper,继承父类Mapper,重写父类map方法,实现如下代码:importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;publicclassMyMapperextendsMapper<LongWritable,Text,Text,LongWritable>{ protectedvoidmap(LongWritablek1,Textv1,Contextcontext) throwsjava.io.IOException,InterruptedException{//对v1用空格进行分割,返回单词数组 Stringwords[]=v1.toString().split(""); Textk2=newText(); LongWritablev2=newLongWritable(1); for(Stringword:words) { k2.set(word); context.write(k2,v2);//将k2/v2写入磁盘 } };}
任务5.2MapReduce实现词频统计2.编写Reducer类创建Java类MyReducer,继承父类Reducer,重写父类reduce方法,实现如下代码:importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;
publicclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{ protectedvoidreduce(Textk2,Iterable<LongWritable>vs2,Contextcontext) throwsjava.io.IOException,InterruptedException{
Textk3=k2;//k3和k2一样,表示每个单词 longsum=0; for(LongWritablei:vs2) { sum+=i.get(); } LongWritablev3=newLongWritable(sum);
context.write(k3,v3);//最终结果
};}
任务5.2MapReduce实现词频统计3.创建驱动类WordCountApp创建驱动类WordCountApp,并编码如下代码:importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.conf.Configuration;publicclassWordCountApp{ publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf,"myjob"); job.setJarByClass(WordCountApp.class); PathfileIn=newPath(args[1]);
PathfileOut=newPath(args[2]);
job.setMapperClass(MyMapper.class);//设置使用的Mapper类
job.setMapOutputKeyClass(Text.class);//设置k2/v2类型
job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,fileIn);//设置任务的输入文件路径
job.setReducerClass(MyReducer.class);//设置使用的Reducer类
//设置k3/v3的类型,如果k3/v3和k2/v2类型一致,此步骤可以省略
//job.setOutputKeyClass(Text.class); //job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job,fileOut);//设置最终结果输出路径
Booleanresult=job.waitForCompletion(true);
System.exit(result?0:-1); }}任务5.2MapReduce实现词频统计4.将程序导出为jar包运行,查看结果在IDEA中打开Maven面板,选择package进行打包,将打包后的jar包上传到服务器上。运行jar包运行完成后查看运行结果,首先查看/out1下是否有生成_SUCCESS文件查看结果文件[root@master01~]#hadoopjarwordcount.jar
WordCountApp/hello/out1[root@master01~]#hdfs
dfs-ls/out1Found2items-rw-r--r--2rootsupergroup02021-11-1111:01/out1/_SUCCESS-rw-r--r--2rootsupergroup252021-11-1111:01/out1/part-r-00000[root@master01~]#hdfsdfs-cat/out1/part-t-00000HDFS1Hello2MapReduce1
任务总结掌握MapReduce数据处理流程掌握MapReduce相关Java
API使用方法掌握MapReduce驱动类编写方法北京汇智科教《大数据平台部署与运维》单元5MapReduce实现电商销售数据统计任务5.3MapReduce完成电商销售数据统计01掌握MapReduce自定义分区的概念与用法02掌握MapReduce自定义数据类型的概念与用法学习目标任务5.3MapReduce完成电商销售数据统计【任务场景】经理:小张,接下来我们要使用MapReduce对电商销售数据进行统计,可能要用到自定义分区和自定义数据类型,你了解MapReduce中的自定义分区和自定义数据类型吗?小张:日常业务中Hadoop提供的数据类型有时不满足使用,我们需要根据业务创建合适的自定义数据类型,也可以通过自定义分区对数据进行分区。经理:是的,自定义数据类型根据场景不同需要实现不同的接口,自定义分区可以把数据分到不同的reducer中。我给你一份后台导出的数据,你用MapReduce统计一下每个买家收藏商品的数量,根据收藏日期做一下自定义分区。小张:好的。任务5.3MapReduce完成电商销售数据统计【任务布置】MapReduce的运行依赖与JDK和Hadoop,因此必须将Hadoop的基础环境提前安装好,才能进行MapReduce的运行和操作。本任务要求在前面已经完成安装部署Hadoop平台的node1节点上完成。要求掌握MapReduce自定义分区与自定义数据类型的创建;基于IDEA进行开发,每个卖家收藏商品数量的统计,将收藏统计结果分为两个分区,2020-04-14日之前及14日当天的数据为一个分区,2020-04-14日之后的数据为一个分区。任务5.3MapReduce完成电商销售数据统计5.3.1MapReduce完成电商销售数据统计的流程数据介绍现有某电商网站用户对商品的收藏数据,记录了用户收藏的商品id以及收藏日期,名为buyer_favorite1。buyer_favorite1包含:买家id,商品id,收藏日期这三个字段,数据以空格分割,样本数据及格式如下:1018110004812020-04-0416:54:312000110015972020-04-0715:07:522000110015602020-04-0715:08:272004210013682020-04-0808:20:302006710020612020-04-0816:45:332005610032892020-04-1210:50:552005610032902020-04-1211:57:352005610032922020-04-1212:05:292005410024202020-04-1415:24:12……任务5.3MapReduce完成电商销售数据统计5.3.2自定义分区1.
MapReduce
Partitioner类通过前面的学习我们知道Mapper最终处理的键值对<key,value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。分区的目的是把具有相同key的值集合在一起,确保key相同的值都会在同一个reducer里面。这样才能保证map的输出数据被均匀的分发到reducer。HadoopMapReduce默认的HadoopPartitioner是哈希
Partitioner(HashPartitioner),它会对key计算哈希值,并基于该哈希值对键值对数据进行分区。任务5.3MapReduce完成电商销售数据统计Partitioner的数量等于reducer的数量,Partitioner会根据reducer的数量来划分数据,reducer数量可以通过下面的方法进行设置:JobConf.setNumReduceTasks()因此,来自同一个分区的数据会被一个reducer任务处理。需要注意的是,只有作业具有多个reducer任务时,分区才会被创建。也就是说,如果作业只有1个reducer任务,分区阶段是不会发生的。低效的分区意味着,某些reducer将比其他reducer任务处理更多的数据。那么,整个作业的运行时间将取决于这些需要处理更多数据的reducer,也就是说,作业的运行时间会更长。为了克服低效分区的问题,我们可以自定义分区器(partitioner),这样我们就可以根据具体业务修改分区逻辑,把数据均分的分发到不同的reducer任务里。任务5.3MapReduce完成电商销售数据统计2.
Partitioner实现过程(1)先分析一下具体的业务逻辑,确定大概有多少个分区(2)首先书写一个类,它要继承
org.apache.hadoop.mapreduce.Partitioner这个抽象类(3)重写publicintgetPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字(4)在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);(5)设置Reducer的数量,job.setNumReduceTasks(2);任务5.3MapReduce完成电商销售数据统计3.总结分区Partitioner主要作用在于以下两点(1)根据业务需要,产生多个输出文件;(2)多个reduce任务并发运行,提高整体job的运行效率任务5.3MapReduce完成电商销售数据统计5.3.3自定义数据类型Hadoop使用了自己写的序列化格式
Writable,它格式紧凑,速度快,但是它很难用Java以外的语言进行拓展或使用,因为Writable是Hadoop的核心,大多数MapReduce程序都会为键和值使用它,Hadoop中的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被网络传输和文件存储。任务5.3MapReduce完成电商销售数据统计自定义数据类型实现过程(1)继承接口Writable,实现其方法write()和readFields(),以便该数据能被序列化后完成网络传输或文件输入/输出。(2)如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo()。(3)数据类型,为了方便反射,必须要有一个无参的构造方法用来创建对象。(4)在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如intx;//IntWritable和Strings;//Text等等。任务5.3MapReduce完成电商销售数据统计【工作流程】创建新的项目;编写自定义数据类型类,继承Writable接口并重写write和readFields方法;编写Mapper类,继承父类并重写map方法;编写自定义分区类,继承父类并重写getPartition方法;编写Reducer类,继承父类并重写reduce方法;编写驱动类,程序导出为jar包运行,查看结果。任务5.3MapReduce完成电商销售数据统计【操作步骤】创建新的项目,项目类型选择Maven,项目名称为MRCollectionCount。1.首先编写自定义数据类型类:CollectionWritable,实现Writable接口并重写write和readFields方法。importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;publicclassCollectionWritableimplementsWritable{privateintcount;//收藏次数privateStringcollectTime;//收藏时间publicintgetCount(){returncount;}publicvoidsetCount(intcount){this.count=count;}publicStringgetCollectTime(){returncollectTime;}publicvoidsetCollectTime(StringcollectTime){this.collectTime=collectTime;}publicCollectionWritable(intcount,StringcollectTime){super();this.collectTime=collectTime;this.count=count;}
publicCollectionWritable(){
}@Overridepublicvoidwrite(DataOutputdataOutput)throwsIOException{dataOutput.writeInt(this.count);dataOutput.writeUTF(this.collectTime);}@OverridepublicvoidreadFields(DataInputdataInput)throwsIOException{this.count=dataInput.readInt();this.collectTime=dataInput.readUTF();}@OverridepublicStringtoString(){return"CollectionWritable{"+"count="+count+",collectTime='"+collectTime+'\''+'}';}}
任务5.3MapReduce完成电商销售数据统计2.编写Mapper类,继承父类并重写map方法importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;
importjava.io.IOException;
publicclassMyMapperextendsMapper<LongWritable,Text,Text,CollectionWritable>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Mapper<LongWritable,Text,Text,CollectionWritable>.Contextcontext)throwsIOException,InterruptedException{//通过空格分割finalString[]splited=value.toString().split("");//第一列为用户IDfinalStringuserId=splited[0];finalTextk2=newText(userId);//收藏次数记为1finalintcount=1;//第三列为收藏时间finalStringcollectTime=splited[2];finalCollectionWritablev2=newCollectionWritable(count,collectTime);context.write(k2,v2);}}
任务5.3MapReduce完成电商销售数据统计3.编写自定义分区类,继承父类并重写getPartition方法importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
publicclassMyPartitionextendsHashPartitioner<Text,CollectionWritable>{@OverridepublicintgetPartition(Textk2,CollectionWritablev2,intnumReduceTasks){Stringdate=v2.getCollectTime();//通过比对收藏时间进行分区
if(pareTo("2020-04-14")>0){return1;}else{return0;}}}任务5.3MapReduce完成电商销售数据统计4.编写Reducer类,继承父类并重写reduce方法importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;
importjava.io.IOException;
publicclassMyReducerextendsReducer<Text,CollectionWritable,Text,LongWritable>{@Overrideprotectedvoidreduce(Textk2,Iterable<CollectionWritable>values,Reducer<Text,CollectionWritable,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{Textk3=k2;longsum=0;//收藏次数进行累加for(CollectionWritablecollect:values){sum+=collect.getCount();}
LongWritablev3=newLongWritable(sum);
context.write(k3,v3);}}
任务5.3MapReduce完成电商销售数据统计5.编写驱动类importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassCollectionApp{publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf,"collectionJob");job.setJarByClass(CollectionApp.class);PathfileIn=newPath(args[0]);PathfileOut=newPath(args[1]);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CollectionWritable.class);FileInputFormat.setInputPaths(job,fileIn);//设置自定义分区类job.setPartitionerClass(MyPartition.class);//将reduce任务数量设置为2job.setNumReduceTasks(2);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileOutputFormat.setOutputPath(job,fileOut);Booleanresult=job.waitForCompletion(true);System.exit(result?0:-1);}}任务5.3MapReduce完成电商销售数据统计6.将程序导出为jar包运行,查看结果运行完成后查看运行结果,首先查看/co_out下是否有生成_SUCCESS文件,并且有两个分区文件。查看结果文件[root@master01~]#hadoopjarCollectionApp.jar
CollectionApp/collection.txt/co_out
[root@master01~]#hdfsdfs-ls/co_outFound3items-rw-r--r--2rootsupergroup02021-11-1914:45/co_out/_SUCCESS-rw-r--r--2rootsupergroup732021-11-1914:45/co_out/part-r-00000-rw-r--r--2rootsupergroup02021-11-1914:45/co_out/part-r-00001
[root@master01~]#hdfsdfs-cat/co_out/part-r-00000101811200012200421200543200551200563200671200761[root@master01~]#hdfsdfs-cat/co_out/part-r-00001200543200569200641200764
任务总结理解MapReduce自定义分区和自定义数据类型的概念掌握MapReduce自定义分区和自定义数据类型的写法能够使用自定义分区与自定义数据类型正确完成电商数据统计程序北京汇智科教《大数据平台部署与运维》单元5MapReduce实现电商销售数据统计任务5.4MapReduce任务监控01掌握MapReduce任务监控方式02理解MapReduce任务失败常见原因学习目标任务5.4MapReduce任务监控【任务场景】经理:小张,现在你已经了解MapReduce的原理,也能通过JavaAPI实现一些功能了,但是你知道MapReduce执行过程中如何监控吗?小张:可以在任务执行时命令行看到一些信息,也可以通过浏览器和Hadoop命令去进行监控。经理:嗯,不错,掌握这几种监控方式就可以对大部分的MapReduce场景进行有效监控了,你再去了解一下MapReduce任务执行失败常见原因。小张:好的。任务5.4MapReduce任务监控【任务布置】掌握MapReduce任务监控方式,理解MapReduce任务执行失败常见原因,再次运行电商数据分析程序,监控任务执行过程。任务5.4MapReduce任务监控5.4.1MapReduce任务监控方式当我们执行一个MapReduce任务后,程序被提交给了集群。对于开发测试集群运行程序的开发人员来说,刚刚接触MapReduce的初学者往往是在命令行前等着程序执行完成,遇到任务运行缓慢或报错的情况时很难做出有效响应。下面介绍MapReduce任务监控的几种形式,让大家能对MapReduce程序进行有效监控。任务5.4MapReduce任务监控1.执行时监控执行Hadoopjar命令后控制台会输出任务信息。执行时需重点关注标红的这几行信息。上面的输出信息可以查看当前任务的ID,以及map阶段、reduce阶段的任务进度,当输出Jobjob_idcompletedsuccessfully时表明任务已成功执行完毕。[root@master01~]#hadoopjar/opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jarwordcount/collection.txt/out2021-11-3003:55:46,111INFOclient.RMProxy:ConnectingtoResourceManageratmaster01/14:80322021-11-3003:55:46,374INFOmapreduce.JobResourceUploader:DisablingErasureCodingforpath:/tmp/hadoop-yarn/staging/root/.staging/job_1638262379053_00012021-11-3003:55:47,138INFOinput.FileInputFormat:Totalinputfilestoprocess:1……2021-11-3003:55:53,710INFOmapreduce.Job:map0%reduce0%2021-11-3003:55:58,774INFOmapreduce.Job:map100%reduce0%2021-11-3003:56:02,799INFOmapreduce.Job:map100%reduce100%2021-11-3003:56:02,807INFOmapreduce.Job:Jobjob_1638262379053_0001completedsuccessfully2021-11-3003:56:02,863INFOmapreduce.Job:Counters:53……任务5.4MapReduce任务监控2.浏览器监控执行Hadoop任务后在浏览器打开管理节点MapReduceWebUI界面
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024光通信设备研发与制造许可合同
- 2024年影视版权授权播放合同
- 2024年房地产赎回服务委托合同
- 2024年新一代信息技术产品采购招投标合同
- 2024年数据中心运营管理协议
- 2024年技术服务合同样本:软件开发服务标的为80万元
- 2024年新式技术转让保密合同
- 2024年新品地板订购协议
- 2024年拉丁美洲特许经营合同
- 2024年排水沟建设承包协议
- 采购合同增补协议范本2024年
- 3.15 秦汉时期的科技与文化 课件 2024-2025学年七年级历史上学期
- 11.2 树立正确的人生目标 课件- 2024-2025学年统编版道德与法治七年级上册
- 特种玻璃课件
- 工厂员工考勤制度范本
- 基于创新能力培养的初中物理跨学科实践教学策略
- Unit 2 This is my pencil. Lesson 10(教学设计)-2024-2025学年人教精通版英语三年级上册
- 2024至2030年中国岩土工程市场深度分析及发展趋势研究报告
- 新版高血压病人的护理培训课件
- 医院等级创建工作汇报
- 2024年江西省公务员录用考试《行测》题(网友回忆版)(题目及答案解析)
评论
0/150
提交评论