MapReduce海量数据并行处理 总结_第1页
MapReduce海量数据并行处理 总结_第2页
MapReduce海量数据并行处理 总结_第3页
MapReduce海量数据并行处理 总结_第4页
MapReduce海量数据并行处理 总结_第5页
已阅读5页,还剩77页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、mapreduce海量数据并行处理复习大纲ch. 1. 并行计算技术简介1.为什么需要并行计算?提高计算机性能有哪些基本技术手段提高字长,流水线微体系结构技术,提高集成度,提升主频迫切需要发展并行计算技术的主要原因1)单处理器性能提升达到极限2)爆炸性增长的大规模数据量2)超大的计算量/计算复杂度2. 并行计算技术的分类有哪些主要的并行计算分类方法?1) 按数据和指令处理结构:弗林(flynn)分类2)按并行类型3)按存储访问构架4)按系统类型5)按计算特征6)按并行程序设计模型/方法1)按数据和指令处理结构:弗林(flynn)分类sisd:单指令单数据流 传统的单处理器串行处理simd:单指

2、令多数据流 向量机,信号处理系统misd:多指令单数据流 很少使用mimd:多指令多数据流 最常用,top500高性能计算机 基本都属于mimd类型2) 按并行类型分类 位级并行(bit-level parallelism) 指令级并行(ilp:instruction-level parallelism) 线程级并行(thread-level parallelism) 数据级并行:一个大的数据块划分为小块,分别由不同的处理器/线程处理 任务级并行:一个大的计算任务划分为子任务分别由不同的处理器/线程来处理3) 按存储访问结构分类a.共享内存(shared memory) 所有处理器通过总线共享

3、内存 多核处理器,smp 也称为uma结构 (uniform memory access)b. 分布共享存储体系结构 各个处理器有本地存储器 同时再共享一个全局的存储器c. 分布式内存(distributed memory) 各个处理器使用本地独立的存储器 b和c也统称为numa结构 (non-uniform memory access)4)按系统类型分类 多核/众核并行计算系统mc(multicore/manycore) 或chip-level multiprocessing, cmp 对称多处理系统smp(symmetric multiprocessing) 多个相同类型处理器通过总线连接

4、并共享存储器 大规模并行处理mpp(massive parallel processing) 专用内联网连接一组处理器形成的一个计算系统 集群(cluster) 网络连接的一组商品计算机构成的计算系统 网格(grid) 用网络连接远距离分布的一组异构计算机构成的计算系统5)按并行程序设计模型/方法分类共享内存变量 (shared memory variables) 消息传递方式(message passing)mapreduce方式3.并行计算的主要技术问题并行计算有哪些方面的主要技术问题?多核/多处理器网络互连结构技术存储访问体系结构分布式数据与文件管理并行计算任务分解与算法设计并行程序设计

5、模型和方法数据同步访问和通信控制可靠性设计与容错技术并行计算软件框架平台系统性能评价和程序并行度评估如何评估程序的可并行度(amdahl定律)程序能得到多大并行加速依赖于该程序有多少可并行计算的比例。经典的程序并行加速评估公式amdahl定律: 其中,s是加速比,p是程序可并行比例,n是处理器数目根据amdahl定律:一个并行程序可加速程度是有限制的,并非可无限加速,并非处理器越多越好并行比例vs加速比50%=>最大2倍75%=>最大4倍90%=>最大10倍95%=>最大20倍4.mpi并行程序设计message passing interface,基于消息传递的高性能

6、并行计算编程接口5.什么是mapreduce概念mapreduce是面向大规模数据并行处理的:(1)基于集群的高性能并行计算平台(cluster infrastructure),(硬件层) 允许用市场上现成的普通pc或性能较高的刀架或机架式服务器,构成一个包含数千个节点的分布式并行计算集群(2)并行程序开发与运行框架(software framework) (逻辑层)系统自动提供了一个庞大但设计精良的并行计算软件构架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行子任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算中的很多复杂细节交由系统

7、负责处理,大大减少了软件开发人员的负担(3)并行程序设计模型与方法(programming model & methodology) (用户层) 借助于函数式lisp语言中的设计思想,提供了一种简便的并行程序设计方法,用map和reduce两个函数编程实现基本的并行计算任务,提供了完整的并行编程接口,完成大规模数据处理6.为什么mapreduce如此重要?1)高效的大规模数据处理方法2)第一个不同于冯诺依曼结构的、基于集群而非单机的计算方式的重大突破3)目前为止最为成功的基于大规模计算资源的并行计算抽象方法ch.2. mapreduce简介1. mapreduce的基本模型和处理思想1

8、) 对大数据分而治之;2) 构建抽象模型-map和reduce,用户仅需要描述做什么,不需要关心怎么做3) 提供统一的构架并完成以下的主要功能·任务调度·数据/代码互定位·出错处理·分布式数据文件管理·combiner和partitioner(设计目的和作用)2. combiner和partitioner设计目的和作用带宽优化(combiner的设计目的和作用),不会改变key-value的形式用数据分区解决数据相关性问题(partitioner的设计目的和作用)例如:有一个巨大的数组,其最终结果需要排序,每个map节点数据处理好后,为了避免在

9、每个reduce节点本地排序完成后还需要进行全局排序,我们可以使用一个分区策略如:(d%r),d为数据大小,r为reduce节点的个数,则可根据数据的大小将其划分到指定数据范围的reduce节点上,每个reduce将本地数据拍好序后即为最终结果ch.3. google /hadoop mapreduce基本构架1. google mapreduce的基本工作原理1)google mapreduce并行处理的基本过程1. 有一个待处理的大数据,被划分为大小相同的数据块(如64mb),及与此相应的用户作业程序2. 系统中有一个负责调度的主节点(master),以及数据map和reduce工作节点(

10、worker)3. 用户作业程序提交给主节点4. 主节点为作业程序寻找和配备可用的map节点,并将程序传送给map节点 5. 主节点也为作业程序寻找和配备可用的reduce节点,并将程序传送给reduce节点 6. 主节点启动每个map节点执行程序,每个map节点尽可能读取本地或本机架的数据进行计算 7. 每个map节点处理读取的数据块,并做一些数据整理工作(combining, sorting等)并将中间结果存放在本地;同时通知主节点计算任务完成并告知中间结果数据存储位置 8. 主节点等所有map节点计算完成后,开始启动reduce节点运行;reduce节点从主节点所掌握的中间结果数据位置信

11、息,远程读取这些数据9. reduce节点计算结果汇总输出到一个结果文件即获得整个处理结果2) 失效处理主节点失效主节点中会周期性地设置检查点(checkpoint),检查整个计算作业的执行情况,一旦某个任务失效,可以从最近有效的检查点开始重新执行,避免从头开始计算的时间浪费,主节点采用热备。工作节点失效工作节点失效是很普遍发生的,主节点会周期性地给工作节点发送检测命令,如果工作节点没有回应,这认为该工作节点失效,主节点将终止该工作节点的任务并把失效的任务重新调度到其它工作节点上重新执行。3)计算优化问题如果有一个计算量大、或者由于某个问题导致很慢结束的map节点,则会成为严重的“拖后腿者”。

12、解决方案把一个map计算任务让多个map节点同时做,取最快完成者的计算结果2. 分布式文件系统gfs的基本工作原理1) google gfs的基本设计原则廉价本地磁盘分布存储多数据自动备份解决可靠性为上层的mapreduce计算框架提供支撑2) google gfs的基本构架和工作原理gfs mastermaster上保存了gfs文件系统的三种元数据 :命名空间(name space),即整个分布式文件系统的目录结构 chunk与文件名的映射表chunk副本的位置信息,每一个chunk默认有3个副本前两种元数据可通过操作日志提供容错处理能力;第3个元数据直接保存在chunkserver上, m

13、aster 启动或chunk server注册时自动完成在chunk server上元数据的生成;因此,当master失效时,只要chunkserver数据保存完好,可迅速恢复master上的元数据。gfs chunkserver 即用来保存大量实际数据的数据服务器。gfs中每个数据块划分默认为64mb,这是因为处理的文件都比较大,所以设置成64mb比较合理每个数据块会分别在3个(缺省情况下)不同的地方复制副本;对每一个数据块,仅当3个副本都更新成功时,才认为数据保存成功。当某个副本失效时,master会自动将正确的副本数据进行复制以保证足够的副本数gfs上存储的数据块副本,在物理上以一个本地

14、的linux操作系统的文件形式存储,每一个数据块再划分为64kb的子块,每个子快有一个32位的校验和,读数据时会检查校验和以保证使用为有效的数据。数据访问工作过程1. 在程序运行前,数据已经存储在gfs文件系统中;程序实行时应用程序会告诉gfs server所要访问的文件名或者数据块索引是什么2. gfs server根据文件名会数据块索引在其文件目录空间中查找和定位该文件或数据块,并找数据块在具体哪些chunkserver上;将这些位置信息回送给应用程序3. 应用程序根据gfsserver返回的具体chunk数据块位置信息,直接访问相应的chunk server优点:并发访问,解决mater

15、拥堵。3.分布式结构化数据表bigtable1) bigtable设计动机和目标需要存储管理海量的结构化半结构化数据海量的服务请求商用数据库无法适用2)目标广泛的适用性:为一系列服务和应用而设计的数据存储系统,可满足对不同类型数据的存储和操作需求很强的可扩展性:根据需要可随时自动加入或撤销服务器节点高吞吐量数据访问:提供p级数据存储能力,每秒数百万次的访问请求高可用性和容错性:保证系统在各种情况下度能正常运转,服务不中断自动管理能力:自动加入和撤销服务器,自动负载平衡简单性:系统设计尽量简单以减少复杂性和出错率2) bigtable数据模型多维表通过行、列、时间戳一个行关键字(row key)

16、一个列关键字(column key)一个时间戳(time stamp)进行索引和查询定位的。行:列:时间戳:3) bigtable基本构架主服务器新子表分配子表监控:通过chubby完成。负债均衡:子表服务器负载均衡操作index64k block64k block64k blocksstable子表服务器bigtable中的数据都以子表形式保存在子表服务器上,客户端程序也直接和子表服务器通信。子表的基本存储结构sstable,一个sstable实际上对应于gfs中的一个64mb的数据块(chunk),sstable中的数据进一步划分为64kb的子块。一个子表服务器上的子表将进一步由很多个ss

17、table构成,每个sstable构成最终的在底层gfs中的存储单位。一个sstable还可以为不同的子表所共享,以避免同样数据的重复存储。子表寻址 子表地址以3级b+树形式进行索引;首先从chubby服务器中取得根子表,由根子表找到二级索引子表,最后获取最终的sstable的位置4. hadoop 分布式文件系统hdfs1)hdfs基本构架2) hdfs数据分布设计多副本数据块形式存储,按照块的方式随机选择存储节点,默认副本数目是33) hdfs可靠性与出错恢复datanode节点的检测心跳:namenode 不断检测datanode是否有效若失效,则寻找新的节点替代,将失效节点数据重新分布

18、集群负载均衡数据一致性: 校验和checksum主节点元数据失效multiple fsimage and editlogcheckpoint5. hadoop mapreduce的基本工作原理1) hadoop mapreduce基本构架与工作过程hadoop mapreduce基本工作过程1、运行作业2、获取作业id3、复制作业资源job.xml: 作业配置,例如mapper, combiner, reducer的类型,输入输出格式的类型等。job.jar: jar包,里面包含了执行此任务需要的各种类,比如 mapper,reducer等实现。 job.spli

19、t: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。4、提交作业调用jobtracker对象的submitjob()方法来提交作业.jobtracker会把将jobid放入taskscheduler变量中,然后以fifo的方式进行调度。当客户作业被调度时,jobtracker会创建一个对象jobinprogress,将有关此作也的任务和记录信息封装其中,以便跟踪任务的状态和进进程信息。5、作业初始化从hdfs中读取要执行的作业所对应的job.split,为初始化map任务的分配做好准备。创建map 任务和reduce任务. tasktracker和jobtrac

20、ker之间通过心跳机制来进行通信。 tasktracker首先将自身的状态发送到jobtracker,并根据自身条件选择是向jobtracker请求新的task。jobtracker接收到tasktracker的心跳后,如果发现tasktracker在请求新的task,那么任务调度器就会将任务和任务信息封装起来,返回给tasktracker。当tasktracker从jobtracker返回的心跳信息中获取新的任务信息时,它会将map 任务或者reduce 任务加入到对应的任务槽中。6、执行作业1.将job.split拷贝到本地;2. 将job.jar拷贝到本地;    3

21、. 将job的配置信息写入job.xml;    4. 创建本地目录,解压job.jar;    5. 调用lunchtaskforjob()方法发布任务7、结果输出对于执行的任务,所有tasktracker任务的执行进度信息都会汇总到jobtracker中,当jobtracker接收到最后一个任务的已完成通知后,便把作业状态设置为“成功” ,同时jobclient也会收到任务成功完成的通知,至此一个mapreduce任务就结束了。2) hadoop mapreduce主要组件文件输入格式inputformat定义了数据文件如何分割和读取,inputfi

22、le提供了以下一些功能:选择文件或者其它对象,用来作为输入定义inputsplits,将一个文件分开成为任务为recordreader提供一个工厂,用来读取这个文件有一个抽象的类fileinputformat,所有的输入格式类都从这个类继承这个类的功能以及特性。当启动一个hadoop任务的时候,一个输入文件所在的目录被输入到fileinputformat对象中。fileinputformat从这个目录中读取所有文件。然后fileinputformat将这些文件分割为一个或者多个inputsplits。通过在jobconf对象上设置jobconf.setinputformat设置文件输入的格式。

23、输入数据分块inputsplitsinputsplit定义了输入到单个map任务的输入数据一个mapreduce程序被统称为一个job,可能有上百个任务构成inputsplit将文件分为64mb的大小,配置文件hadoop-site.xml中的mapred.min.split.size参数控制这个大小mapred.tasktracker.map.taks.maximum用来控制某一个节点上所有map任务的最大数目数据记录读入recordreaderinputsplit定义了一项工作的大小,但是没有定义如何读取数据recordreader实际上定义了如何从数据上转化为一个(key,value)对

24、的详细方法,并将数据输出到mapper类中textinputformat提供了linerecordreader,读入一个文本行记录数据。mapper每一个mapper类的实例生成了一个java进程(在某一个inputsplit上执行)有两个额外的参数outputcollector以及reporter,前者用来收集中间结果,后者用来获得环境参数以及设置当前执行的状态。现在的版本用mapper.context提供给每一个mapper函数,用来提供上面两个对象的功能combiner合并相同key的键值对,减少partitioner时候的数据通信开销conf.setcombinerclass(redu

25、ce.class);是在本地执行的一个reducer,满足一定的条件才能够执行。partitioner & shuffle在map工作完成之后,每一个 map函数会将结果传到对应的reducer所在的节点,此时,用户可以提供一个partitioner类,用来决定一个给定的(key,value)对传输的具体位置。sort传输到每一个节点上的所有的reduce函数接收到得key,value对会被hadoop自动排序(即map生成的结果传送到某一个节点的时候,会被自动排序)reducer做用户定义的reduce操作接收到一个outputcollector的类作为输出最新的编程接口是reduc

26、er.context文件输出格式outputformat写入到hdfs的所有outputformat都继承自fileoutputformat每一个reducer都写一个文件到一个共同的输出目录,文件名是part-nnnnn,其中nnnnn是与每一个reducer相关的一个号(partition id)fileoutputformat.setoutputpath()jobconf.setoutputformat()recordwritertextoutputformat实现了缺省的linerecordwriter,以”key,value”形式输出一行结果。3) 容错处理与计算性能优化由hadoo

27、p系统自己解决主要方法是将失败的任务进行再次执行tasktracker会把状态信息汇报给jobtracker,最终由jobtracker决定重新执行哪一个任务为了加快执行的速度,hadoop也会自动重复执行同一个任务,以最先执行成功的为准(投机执行)mapred.map.tasks.speculative.executionmapred.reduce.tasks.speculative.execution6. hadoop 分布式文件系统hdfs编程filesystem基类filesystem是一个用来与文件系统交互的抽象类,可以通过实现filesystem的子类来处理具体的文件系统,比如hd

28、fs或者其它文件系统通过factory方法filesystem.get(configuration conf)获得所需的文件系统实例configuration conf = new configuration();filesystemhdfs= filesystem.get(conf);hadoop中,使用path类的对象来编码目录或者文件的路径,使用filestatus类来存放目录和文件的信息。创建文件create方法有很多种定义形式,但一般仅需使用简单的几种public fsdataoutputstreamcreate(pathf);public fsdataoutputstreamcre

29、ate(pathf,booleanoverwrite);public fsdataoutputstreamcreate(pathf,booleanoverwrite,intbuffersize);打开文件filesystem.open方法有2个,参数最多的一个定义如下:public abstract fsdatainputstreamopen(path f,intbuffersize)throws ioexceptionf: 文件名buffersize:文件缓存大小。默认值:configuration中io.file.buffer.size的值,如果configuration中未显式设置该值则

30、是4096。获取文件信息filesystem.getfilestatus方法格式如下:public abstract filestatusgetfilestatus(path f) throws ioexception;返回一个filestatus对象。filestatus保存文件的很多信息,包括:path:文件路径length:文件长度isdir:是否为目录block_replication: 数据块副本因子blocksize:文件长度(数据块数)modification_time: 最近一次修改时间access_time: 最近一次访问时间owner:文件所属用户group:文件所属组如果

31、想了解文件的这些信息,可以在获得文件的filestatus实例之后,调用相应的getxxx方法(比如,filestatus.getmodificationtime()获得最近修改时间)获取目录信息获取目录信息,不仅是目录本身,还有目录之下的文件和子目录信息:public filestatus liststatus(pathf) throws ioexception;如果f是目录,那么将目录之下的每个目录或文件信息保存在filestatus数组中返回。如果f是文件,和getfilestatus功能一致。另外,liststatus还有参数为path的版本的接口定义以及参数带路径过滤器pathfil

32、ter的接口定义,参数为path的liststatus就是对这个数组中的每个path都调用上面的参数为path的liststatus。参数中的pathfilter则是一个接口,实现接口的accept方法可以自定义文件过滤规则。文件读取调用open打开文件之后,使用了一个fsdatainputstream对象来负责数据的读取。通过fsdatainputstream进行文件读取时,提供的api就是fsdatainputstream.read方法:public intread(longposition, bytebuffer,intoffset, intlength) throws ioexcept

33、ion从文件的指定位置position开始,读取最多length字节的数据,保存到buffer中从offset个元素开始的空间中;返回值为实际读取的字节数。此函数不改变文件当前offset值。但使用更多的还有一种简化版本:public final intread(byteb)throws ioexception从文件当前位置读取最多长度为b.len的数据保存到b中,返回值为实际读取的字节数。文件写入从接口定义可以看出,调用create创建文件以后,使用了一个fsdataoutputstream对象来负责数据的写入。通过fsdataoutputstream进行文件写入时,最常用的api就是wri

34、te方法:public void write(byte b,int off,int len)throws ioexception函数的意义是:将b中从off开始的最多len个字节的数据写入文件当前位置。返回值为实际写入的字节数。关闭关闭为打开的逆过程,filesystem.close定义如下:public void close()throws ioexception不需要其它操作而关闭文件。释放所有持有的锁。删除public abstract booleandelete(path f,boolean recursive)throws ioexceptionf: 待删除文件名recursive:

35、如果recursive为true,并且f是目录,那么会递归删除f下所有文件。f是文件的话,recursive为true还是false无影响。另外,类似java中file的接口deleteonexit,如果某些文件需要删除,但是当前不能被删;或者说当时删除代价太大,想留到退出时再删除的话,filesystem中也提供了一个deleteonexit接口:public booleandeleteonexit(pathf) throws ioexception标记文件f,当文件系统关闭时才真正删除此文件,但是这个文件f必须存在。ch.5. mapreduce算法设计1. mapreduce可解决哪些算

36、法问题?mapreduce可广泛应用于搜索引擎(文档倒排索引,网页链接图分析与页面排序等)、web日志分析、文档分析处理、机器学习、机器翻译等各种大规模数据并行计算应用领域各类大规模数据并行处理算法。2. mapreduce排序算法3. mapreduce单词同现分析算法语料we are not whatwe want to bebut at leastwe are not whatwe used to beafter map(<we, are>, 1)(<are, not>, 1)(<not, what>, 1)(<we, want>, 1)(

37、<want, to>, 1)(<to, be>, 1)(<but, at>,1)(<at, least>,1)(<we, are>,1)(<are, not>,1)(<not, what>,1)(<we , used>,1)(<used, to>,1)(<to, be>,1)after shuffle and sort(<we, are>,1,1)(<are, not>,1,1)(<not, what>,1,1)(<we, want&

38、gt;,1)(<want, to>,1)(<to, be>,1,1)(<but, at>, 1)(<at, least>, 1)(<we, used>, 1)(<used, to>, 1)after reduce(<we, are>,2)(<are, not>,2)(<not, what>,2)(<we, want>,1)(<want, to>,1)(<to, be>,2)(<but, at>,1)(<at, least>,1)

39、(<we, used>,1)(<used, to>,1)4. 文档倒排索引算法改进:map输出的key除了文件名,还给出了该词所在行的偏移值:格式:filename#offsetmapper:text filename_lineoffset= new text(filename+”#”+key.tostring();stringtokenizeritr= new stringtokenizer(value.tostring();for(; itr.hasmoretokens(); ) word.set(itr.nexttoken();context.write(word

40、, filename_lineoffset);reducerprotected void reduce(text key, iterable<text> values, context context)throws ioexception, interruptedexceptioniterator<text> it = values.iterator();stringbuilderall = new stringbuilder();if(it.hasnext() all.append(it.next().tostring();for(; it.hasnext(); )

41、all.append(“;");all.append(it.next().tostring();context.write(key, new text(all.tostring(); /最终输出键值对示例:(“fish", “doc1#0; doc1#8;doc2#0;doc2#8 ")5. 专利文献数据分析patent description data set “apat63_99.txt” “patent”,”gyear”,”gdate”,”appyear”,”country”, ”postate”,”assignee”, ”asscode”,”claims”

42、,”nclass”,”cat”,”subcat”,”cmade”,”creceive”, ”ratiocit”,”general”,”original”,”fwdaplag”,”bckgtlag”,”selfctub”, ”selfctlb”,”secdupbd”,”secdlwbd”3070801,1963,1096,”be”,”,1,269,6,69,1,0,3070802,1963,1096,”us”,”tx”,1,2,6,63,0,3070803,1963,1096,”us”,”il”,1,2,6,63,9,0.3704,3070804,1963,1096,”us”,”oh”,1,2,

43、6,63,3,0.6667,3070805,1963,1096,”us”,”ca”,1,2,6,63,1,0,public void map(longwritable key, text value, context context)throws ioexception, interruptedexception / 输入key: 行偏移值;value: “citing专利号, cited专利号” 数据对string citation = value.tostring().split(“,”);context.write(new text(citation1), new text(citati

44、on0); / 输出key: cited 专利号;value: citing专利号public void reduce(text key, iterable<text> values, context context)throws ioexception, interruptedexception string csv = “”;for (text val:values) if (csv.length() > 0) csv += “,”; csv += val.tostring();context.write(key, new text(csv); / 输出key: cite

45、d专利号;value: “citing专利号1, cited专利号2,” 专利被引次数直方图统计直接用hadoop内置的keyvalue文本输入格式读取以key-value对形式保存的专利被引次数统计输出结果job.setinputformat(keyvaluetextinputformat.class);年份/国家专利数统计patent description data set “apat63_99.txt”“patent”,”gyear”,”gdate”,”appyear”,”country”, ”postate”,”assignee”, ”asscode”,”claims”,”nclas

46、s”,”cat”,”subcat”,”cmade”,”creceive”, ”ratiocit”,”general”,”original”,”fwdaplag”,”bckgtlag”,”selfctub”, ”selfctlb”,”secdupbd”,”secdlwbd”3070801,1963,1096,”be”,”,1,269,6,69,1,0,3070802,1963,1096,”us”,”tx”,1,2,6,63,0,3070803,1963,1096,”us”,”il”,1,2,6,63,9,0.3704,3070804,1963,1096,”us”,”oh”,1,2,6,63,3,

47、0.6667,3070805,1963,1096,”us”,”ca”,1,2,6,63,1,0,主要设计思想是:分析以上的专利描述数据集,根据要统计的列名(年份或国家等),取出对应列上的年份(col_idx=1)或国家(col_idx=4),然后由map发出(year,1)或(country,1),再由reduce累加。年份专利统计输出每年申请美国专利的国家数统计 假如我们需要从专利描述数据集中统计每年有多少个国家申请了美国专利,并得出如下的统计直方图,该如何实现map和reduce?每年申请美国专利的国家数统计solution 11. map中用<year, country>作为

48、key输出,emit(<year, country>,1) (<1963, be>, 1), (<1963, us>, 1), (<1963, us>, 1), 2. 实现一个定制的partitioner,保证同一年份的数据划分到同一个reduce节点3. reduce中对每一个(<year, country>, 1, 1,1,)输入,忽略后部的出现次数,仅考虑key部分:<year, country> 问题:如每碰到一个<year, country >,即emit(year, 1)有问题吗?答案:有问题。因为

49、可能会有从不同map节点发来的同样的<year, country>, 因此会出现对同一国家的重复计数解决办法:在reduce中仅计数同一年份下不同的国家个数问题: map结果(<year, country>, 1, 1,1,)数据通信量较大解决办法:实现一个combiner将1, 1,1,合并为1solution 21. map中用一个数据结构保存<year, country>,并检查一个新的<year, country>是否已经出现,若未出现则emit(year,country)并将<year, country>加入数据结构;否则跳

50、过。如此在每个map节点上可保证一个年份下一个国家仅出现一次2. reduce中对每一个(year, country,country,country)输入直接计数其中的country个数有问题吗?答案:有问题。因为可能会有从不同map节点发来的同样的<year, country>, 因此会出现对同一国家的重复计数解决办法:在reduce中仅计数同一年份下不同的国家个数ch.6. hbase与hive程序设计1. hbase基本工作原理可与mapreduce协同工作,为mapreduce提供数据输入输出,以完成数据的并行化处理与bigtable类似,一个行关键字(row key)、一

51、个列关键字(column key)、一个时间戳(time stamp)进行索引和查询定位的。hbase物理存储格式物理存储格式上按逻辑模型中的行进行分割,并按照列族存储。值为空的列不予存储,节省存储空间。a. hbase的基本构架由一个masterserver和由一组子表数据区服务器regionserver构成,分别存储逻辑大表中的部分数据.大表中的底层数据存于hdfs中b. hbase数据存储管理方法与bigtable类似,大表被分为很多个子表(region),每个子表存储在一个子表服务器regionserver上每个子表中的数据区region由很多个数据存储块store构成.而每个stor

52、e数据块又由存放在内存中的memstore和存放在文件中的storefile构成hbase子表数据存储与子表服务器c. hbase数据的访问1)当客户端需要进行数据更新时,先查到子表服务器,然后向子表提交数据更新请求。提交的数据并不直接存储到磁盘上的数据文件中,而是添加到一个基于内存的子表数据对象memstore中,当memstore中的数据达到一定大小时,系统将自动将数据写入到文件数据块storefile中。2) 每个文件数据块storefile最后都写入到底层基于hdfs的文件中3) 需要查询数据时,子表先查memstore。如果没有,则再查磁盘上的storefile。每个storefil

53、e都有类似b树的结构,允许进行快速的数据查询。storefile将定时压缩,多个压缩为一个4) 两个小的子表可以进行合并;子表大到超过某个指定值时,子表服务器就需要调用hregion.closeandsplit(),把它分割为两个新的子表。d. hbase主服务器hserver与bigtable类似,hbase使用主服务器hserver来管理所有子表服务器。主服务器维护所有子表服务器在任何时刻的状态。当一个新的子表服务器注册时,主服务器让新的子表服务器装载子表。若主服务器与子表服务器连接超时,那么子表服务器将自动停止,并重新启动;而主服务器则假定该子表服务器已死机,将其上的数据转移至其它子表服

54、务器,将其上的子表标注为空闲,并在重新启动后另行分配使用。e. hbase数据记录的查询定位描述所有子表和子表中数据块的元数据都存放在专门的元数据表中,并存储在特殊的子表中。子表元数据会不断增长,因此会使用多个子表来保存。而所有元数据子表的元数据都保存在根子表中。主服务器会扫描根子表,从而得到所有的元数据子表位置,再进一步扫描这些元数据子表即可获得所寻找子表的位置。hbase使用三层类似b+树的结构来保存region位置第一层是保存zookeeper里面的文件,它持有root region的位置。第二层root region是.meta.表的第一个region其中保存了.meta.表其它reg

55、ion的位置。通过root region,我们就可以访问.meta.表的数据。.meta.是第三层,它是一个特殊的表,保存了hbase中所有数据表的region 位置信息。2. hbase基本操作与编程方法示例a. hbase shell 操作(命令行操作)创建表格create 表名,字段1,字段2,与列举表格list插入表格记录put 表名,值1,值2,描述表信息(列出表格的基本信息,元数据)describe 表名扫描数据(列出表格的记录信息)scan 表名限制列进行扫描scan 表名,columns=>字段b. hbase中的disable和enabledisable和enable都

56、是hbase中比较常见的操作,很多对table的修改都需要表在disable的状态下才能进行.disable students将表students的状态更改为disable的时候,hbase会在zookeeper中的table结点下做记录.在zookeeper记录下该表的同时,还会将表的region全部下线,region为offline状态.enable的过程和disable相反,会把表的所有region上线,并删除zookeeper下的标志。如果在enable前,meta中有region的server信息,那么此时会在该server上将该region online;如果没有server的信息,那么此时还要随机选择一台机器作为该region的serverc. hbase的java编程创建表创建

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论