hadoop源码分析mapreduce部分_第1页
hadoop源码分析mapreduce部分_第2页
hadoop源码分析mapreduce部分_第3页
hadoop源码分析mapreduce部分_第4页
hadoop源码分析mapreduce部分_第5页
已阅读5页,还剩18页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、2009-02-21Hadoop源代码分析(MapReduce概论)大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档。在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分。下面的图来自,是我看到的讲MapReduce最好的图。 以Hadoop带的wordcount为例子(下面是启动行):hadoop jar hadoop-examples.jar wordcount /usr/input /usr/output用户提交一个任务以后,该任务

2、由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控,并运行在独立于TaskTracker的Java虚拟机中。我们的输入和输出都是HDFS上的目录(如上图所示)。输入由InputFormat接口描述,它的实现如ASCII文件,JDBC数据库等,分别处理对于的数据源,并提供了数据的一些特征。通过InputFormat实现,可以获取InputSplit接口的实现,这个实现用于对数据进行划分(图中的splite1到splite5,就是划分以后的结果),同时从InputForma

3、t也可以获取RecordReader接口的实现,并从输入中生成<k,v>对。有了<k,v>,就可以开始做map操作了。map操作通过context.collect(最终通过OutputCollector. collect)将结果写到context中。当Mapper的输出被收集后,它们会被Partitioner类以指定的方式区分地写出到输出文件里。我们可以为Mapper提供Combiner,在Mapper输出它的<k,v>时,键值对不会被马上写到输出里,他们会被收集在list里(一个key值一个list),当写入一定数量的键值对时,这部分缓冲会被Combine

4、r中进行合并,然后再输出到Partitioner中(图中M1的黄颜色部分对应着Combiner和Partitioner)。Map的动作做完以后,进入Reduce阶段。这个阶段分3个步骤:混洗(Shuffle),排序(sort)和reduce。混洗阶段,Hadoop的MapReduce框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上(多个Mapper产生的同一个key的中间结果分布在不同的机器上,这一步结束后,他们传输都到了处理这个key的Reducer的机器上)。这个步骤中的文件传输使用了HTTP协议。排序和混洗是一块进行的,这个阶段将来自不同Mapper具有相同key

5、值的<key,value>对合并到一起。Reduce阶段,上面通过Shuffle和sort后得到的<key, (list of values)>会送到Reducer. reduce方法中处理,输出的结果通过OutputFormat,输出到DFS中。2009-02-25Hadoop源代码分析(包org.apache.hadoop.mapreduce)有了前一节的分析,我们来看一下具体的接口,它们都处于包中。 上面的图中,类可以分为4种。右上角的是从Writeable继承的,和Counter(还有CounterGroup和Counters,也在这个包中,并没有出现

6、在上面的图里)和ID相关的类,它们保持MapReduce过程中需要的一些计数器和标识;中间大部分是和Context相关的*Context类,它为Mapper和Reducer提供了相关的上下文;关于Map和Reduce,对应的类是Mapper,Reducer和描述他们的Job(在Hadoop 中一次计算任务称之为一个job,下面的分析中,中文为“作业”,相应的task我们称为“任务”);图中其他类是配合Mapper和Reduce工作的一些辅助类。如果你熟悉HTTPServlet, 那就能很轻松地理解Hadoop采用的结构,把整个Hadoop看作是容器,那么Mapper和Reduce就是容器里的组

7、件,*Context保存了组件的一些配置信息,同时也是和容器通信的机制。和ID相关的类我们就不再讨论了。我们先看JobContext,它位于*Context继承树的最上方,为Job提供一些只读的信息,如Job的ID,名称等。下面的信息是MapReduce过程中一些较关键的定制信息:(来自):参数作用缺省值其它实现InputFormat 将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现, 将一个 InputSplit 解析成 <key,valu

8、e> 对提供给 map 函数。TextInputFormat(针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value> 对,key 是行在文件中的位置,value 是文件中的一行) SequenceFileInputFormat OutputFormat 提供一个 RecordWriter 的实现,负责输出最终结果TextOutputFormat(用 LineRecordWriter 将最终结果写成纯文件文件,每个 <key,value> 对一行,key 和 va

9、lue 之间用 tab 分隔) SequenceFileOutputFormatOutputKeyClass 输出的最终结果中 key 的类型LongWritable  OutputValueClass 输出的最终结果中 value 的类型Text  MapperClass Mapper 类,实现 map 函数,完成输入的 <key,value> 到中间结果的映射IdentityMapper(将输入的 <key,value> 原封不动的输出为中间结果) LongSumReducer,LogRegexMapper,InverseMapper Combi

10、nerClass 实现 combine 函数,将中间结果中的重复 key 做合并null(不对中间结果中的重复 key 做合并)  ReducerClass Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果IdentityReducer(将中间结果直接输出为最终结果) AccumulatingReducer, LongSumReducer InputPath 设定 job 的输入目录, job 运行时会处理输入目录下的所有文件null  OutputPath 设定 job 的输出目录,job 的最终结果会写入输出目录下null  Map

11、OutputKeyClass 设定 map 函数输出的中间结果中 key 的类型如果用户没有设定的话,使用 OutputKeyClass  MapOutputValueClass 设定 map 函数输出的中间结果中 value 的类型如果用户没有设定的话,使用 OutputValuesClass  OutputKeyComparator 对结果中的 key 进行排序时的使用的比较器WritableComparable  PartitionerClass 对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理

12、。HashPartitioner(使用 Hash 函数做 partition) KeyFieldBasedPartitioner PipesPartitioner Job继承自JobContext,提供了一系列的set方法,用于设置Job的一些属性(Job更新属性,JobContext读属性),同时,Job还提供了一些对Job进行控制的方法,如下:l           mapProgress:map的进度(01.0);l      &

13、#160;    reduceProgress:reduce的进度(01.0);l           isComplete:作业是否已经完成;l           isSuccessful:作业是否成功;l           killJob:结束一个在运行中的作业

14、;l           getTaskCompletionEvents:得到任务完成的应答(成功/失败);l           killTask:结束某一个任务;2009-02-25Hadoop源代码分析(包mapreduce.lib.input)接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应

15、的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下: 类图的右上角是InputFormat,它描述了一个MapReduce Job的输入,通过InputFormat,Hadoop可以:l           检查MapReduce输入数据的正确性;l           将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;l

16、0;          提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的<K,V>对。在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:l           mapre

17、d.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;l           mapred.min.split.size:最小的划分大小;l           mapred.max.split.size:最大的划分大小;l         

18、0; mapred.input.dir:输入路径,用逗号做分割。类中比较重要的方法有: protected List<FileStatus> listStatus(Configuration job)递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。 public List<InputSplit> getSplits(JobContext context)将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划

19、分。注意,划分不会跨越文件。FileInputFormat没有实现InputFormat的createRecordReader方法。FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考/core/docs/current/api/org/apache/hadoop/io/SequenceFile.html),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createR

20、ecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名,起始偏移量,划分的长度和可能的目标机器)已经足以说明这个类的功能。RecordRead

21、er用于在划分中读取<Key,Value>对。RecordReader有五个虚方法,分别是:l           initialize:初始化,输入参数包括该Reader工作的数据划分InputSplit和Job的上下文context;l           nextKey:得到输入的下一个Key,如果数据划分已经没有新的记录,返回空;l    

22、;       nextValue:得到Key对应的Value,必须在调用nextKey后调用;l           getProgress:得到现在的进度;l           close,来自java.io的Closeable接口,用于清理RecordReader。我们以LineRecordReader为例,来分析RecordRe

23、ader的构成。前面我们已经分析过FileInputFormat对文件的划分了,划分完的Split包括了文件名,起始偏移量,划分的长度。由于文件是文本文件,LineRecordReader的初始化方法initialize会创建一个基于行的读取对象LineReader(定义在中,我们就不分析啦),然后跳过输入的最开始的部分(只在Split的起始偏移量不为0的情况下进行,这时最开始的部分可能是上一个Split的最后一行的一部分)。nextKey的处理很简单,它使用当前的偏移量作为Key,nextValue当然就是偏移量开始的那一行了(如果行很长,可能出现截断)。进度getProgress和clos

24、e都很简单。2009-02-25Hadoop源代码分析(包mapreduce.lib.map)Hadoop的MapReduce框架中,Map动作通过Mapper类来抽象。一般来说,我们会实现自己特殊的Mapper,并注册到系统中,执行时,我们的Mapper会被MapReduce框架调用。Mapper类很简单,包括一个内部类和四个方法,静态结构图如下: 内部类Context继承自MapContext,并没有引入任何新的方法。Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mappe

25、r构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入<key, value> map为输出<value, key>),MultithreadedMapper(多线程执行map方法)和TokenCounterMappe

26、r(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithread

27、edrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了

28、对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。2009-02-26Hadoop源代码分析(mapreduce.lib.partition/reduce/output)Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。 Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位

29、置和使用时的上下文不太一样而已。Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那,哪个key到哪个Reducer的分配过程,是由Partitioner规定的,它只有一个方法,输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。Reducer是所有用户定制Reducer类的基

30、类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助

31、接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。OutputFormat和RecordWriter分别对应着InputFormat和Reco

32、rdReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括press:是否压缩;pression.codec:压缩方法;mapred.output.dir:输出路径;map

33、red.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat,我们就不再详细分析啦。2009-0

34、3-06Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用。但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230)。下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为Deprecated的类/接口): 我们把包map

35、reduce的类图附在下面,对比一下,我们就会发现,org.apache.hadoop.mapred中的MapReduce API相对来说很简单,主要是少了和Context相关的类,那么,好多在mapreduce中通过context来完成的工作,就需要通过参数来传递,如Map中的输出,老版本是: output.collect(key, result); / outputs type is: OutputCollector新版本是: context.write(key, result); / outputs type is: Context它们分别使用OutputCollector和Mapper

36、.Context来输出map的结果,显然,原有OutputCollector的新API中就不再需要。总体来说,老版本的API比较简单,MapReduce过程中关键的对象都有,但可扩展性不是很强。同时,老版中提供的辅助类也很多,我们前面分析的FileOutputFormat,也有对应的实现,我们就不再讨论了。 2009-03-10Hadoop源代码分析(*IDs类和*Context类)我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在Task

37、Tracker的控制下运行。Task包括MapTask和ReduceTask,是MapReduce的Map操作和Reduce操作执行的地方。这中任务分布的方法比较类似于HDFS中NameNode和DataNode的分工,NameNode对应的是JobTracker,DataNode对应的是TaskTracker。JobTracker,TaskTracker和MapReduce的客户端通过RPC通信,具体可以参考HDFS部分的分析。我们先来分析一些辅助类,首先是和ID有关的类,ID的继承树如下: 这张图可以看出现在Hadoop的org.apache.hadoop.mapred向org.

38、apache.hadoop.mapreduce迁移带来的一些问题,其中灰色是标注为Deprecated的。ID携带一个整型,实现了WritableComparable接口,这表明它可以比较,而且可以被Hadoop的io机制串行化/解串行化(必须实现compareTo/readFields/write方法)。JobID是系统分配给作业的唯一标识符,它的toString结果是job_<jobtrackerID>_<jobNumber>。例子:job_200707121733_0003表明这是jobtracker 200707121733(利用jobtracker的开始时间作

39、为ID)的第3号作业。作业分成任务执行,任务号TaskID包含了它所属的作业ID,同时也有任务ID,同时还保持了这是否是一个Map任务(成员变量isMap)。任务号的字符串表示为task_<jobtrackerID>_<jobNumber>_m|r_<taskNumber>,如task_200707121733_0003_m_000005表示作业200707121733_0003的000005号任务,改任务是一个Map任务。一个任务有可能有多个执行(错误恢复/消除Stragglers等),所以必须区分任务的多个执行,这是通过类TaskAttemptID来完成

40、,它在任务号的基础上添加了尝试号。一个任务尝试号的例子是attempt_200707121733_0003_m_000005_0,它是任务task_200707121733_0003_m_000005的第0号尝试。JVMId用于管理任务执行过程中的Java虚拟机,我们后面再讨论。为了使Job和Task工作,Hadoop提供了一系列的上下文,这些上下文保存了Job和Task工作的信息。 处于继承树的最上方是org.apache.hadoop.mapreduce.JobContext,前面我们已经介绍过了,它提供了Job的一些只读属性,两个成员变量,一个保存了JobID,另一个类型为Jo

41、bConf,JobContext中除了JobID外,其它的信息都保持在JobConf中。它定义了如下配置项:l           mapreduce.inputformat.class:InputFormat的实现l           mapreduce.map.class:Mapper的实现l        &#

42、160;  bine.class: Reducer的实现l           mapreduce.reduce.class:Reducer的实现l           mapreduce.outputformat.class: OutputFormat的实现l           map

43、reduce.partitioner.class: Partitioner的实现同时,它提供方法,使得通过类名,利用Java反射提供的Class.forName方法,获得类对应的C.apache.hadoop.mapred的JobContext对象比org.apache.hadoop.mapreduce.JobContext多了成员变量progress,用于获取进度信息,它类型为JobConf成员job指向mapreduce.JobContext对应的成员,没有添加任何新功能。JobConf继承自Configuration,保持了MapReduce执行需要的一些配置信息,它管理

44、着46个配置参数,包括上面mapreduce配置项对应的老版本形式,如mapreduce.map.class 对应mapred.mapper.class。这些配置项我们在使用到它们的时候再介绍。org.apache.hadoop.mapreduce.JobContext的子类Job前面也已经介绍了,后面在讨论系统的动态行为时,再回来看它。TaskAttemptContext用于任务的执行,它引入了标识任务执行的TaskAttemptID和任务状态status,并提供新的访问接口。org.apache.hadoop.mapred的TaskAttemptContext继承自mapreduce的对应

45、版本,只是增加了记录进度的progress。TaskInputOutputContext和它的子类都在包中,前面已经分析过了,我们就不再罗嗦。2009-03-10Hadoop源代码分析(类TaskStatus)我们先分析Task,这是一个规模比较大的类,类图如下。Task是一个虚类,它有两个子类,MapTask和ReduceTask,分别是Map任务和Reduce任务的抽象。 在分析Task相关类之前,我们来分析和ID,JobID,TaskID相关的类。我们从TaskStatus开始来分析Task相关的类,TaskStatus,一看类名就知道它保持了Task的状态。从前面介绍MapR

46、educe的过程中,我们了解到,MapReduce的过程可以处于下面6个阶段,它们定义在枚举:TaskStatus.Phase中,包括如下状态:l           STARTING:开始l           MAP:Map阶段l           SHUFFLE:混洗阶段l 

47、60;         SORT:排序阶段l           REDUCE:Reduce阶段l           CLEANUP:清理阶段除了阶段,TaskStatus还维护任务的状态,很明显,如果不考虑异常,一次任务应该包括准备,运行和清理三个主要阶段,其实TaskStatus的正常流程和这个非常类似,同时,考

48、虑到任务可能异常结束或被JobTracker杀死,系统还引入配合这两种异常情况的状态,其状态如下: 图中引入了复合状态,只是表明这些状态中包含的状态(如绿色的COMMIT_PENDING和SUCCESSED)可以转移到外面的状态(FAILED)。(注:这张图是通过人肉逆向工程画出来的,在以后的分析过程中,这张图会根据我们对系统的深入了解而修改)接下来我们来看TaskStatus的其它成员,它的完整类图如下,基本上是一些信息,没有复杂的操作。 它包含的主要状态信息有:taskid(对应的任务号),progress(处理情况),runState(运行状态,注意和任务阶段做区分)

49、,diagnosticInfo(诊断信息),stateString(运行状态),taskTracker(对应的taskTracker),startTime(开始时间),finishTime(结束时间),outputSize(输出大小),phase(任务阶段,注意和运行状态做区分),counters(相关的计数器),includeCounters(是否包含成员变量counters)nextRecordRange(处理的记录范围)。TaskStatus有两个子类,分别是MapTaskStatus(没有添加任何新的成员变量)和ReduceTaskStatus。ReduceTaskStatus是Red

50、uce任务的状态,它包含了新信息shuffleFinishTime(shuffle结束时间)和sortFinishTime(sort结束时间)。同时,获取Map结果出错时,对应的Map的TaskAttemptID会保存在failedFetchTasks中,等待上报。最后我们看一下辅助类Counters/Counters.Counter/Counters.Group,它们保存了MapReduce过程中的一些统计计数器,Counters.Counter记录了一个计数器的<名字,显示名,值>,Counters.Group将相关的Counters.Counter聚合成组,并引入组名,组显示

51、名。2009-05-24Hadoop源代码分析(IFile)Mapper的输出,在发送到Reducer前是存放在本地文件系统的,IFile提供了对Mapper输出的管理。我们已经知道,Mapper的输出是<Key,Value>对,IFile以记录<key-len, value-len, key, value>的形式存放了这些数据。为了保存键值对的边界,很自然IFile需要保存key-len和value-len。和IFile相关的类图如下: 其中,文件流形式的输入和输出是由IFIleInputStream和IFIleOutputStream抽象。以记录形式的读/

52、写操作由IFile.Reader/IFile.Writer提供,IFile.InMemoryReader用于读取存在于内存中的IFile文件格式数据。我们以输出为例,来分析这部分的实现。首先是下图的和序列化反序列化相关的Serialization/Deserializer,这部分的code是在包org.apache.hadoop.io.serializer。序列化由Serializer抽象,通过Serializer的实现,用户可以利用serialize方法把对象序列化到通过open方法打开的输出流里。Deserializer提供的是相反的过程,对应的方法是deserialize。hadoop.

53、io.serializer中还实现了配合工作的Serialization和对应的工厂SerializationFactory。两个具体的实现是WritableSerialization和JavaSerialization,分别对应了Writeble的序列化反序列化和Java本身带的序列化反序列化。 有了Serializer/Deserializer,我们来分析IFile.Writer。Writer的构造函数是:    public Writer(Configuration conf, FSDataOutputStream out,   

54、;      Class<K> keyClass, Class<V> valueClass,        CompressionCodec codec, Counters.Counter writesCounter)conf,配置参数,out是Writer的输出,keyClass 和valueClass 是输出的Kay,Value的class属性,codec是对输出进行压缩的方法,参数writesCounter用于对输出字节数进行统计的Counter

55、s.Counter。通过这些参数,我们可以构造我们使用的支持压缩功能的输出流(类成员out,类成员rawOut保存了构造函数传入的out),相关的计数器,还有就是Kay,Value的Serializer方法。Writer最主要的方法是append方法(居然不是write方法,呵呵),有两种形式:public void append(K key, V value) throws IOException public void append(DataInputBuffer key, DataInputBuffer value)append(K key, V value)的主要过程是检查参数,然后将

56、key和value序列化到DataOutputBuffer中,并获取序列化后的长度,最后把长度(2个)和DataOutputBuffer中的结果写到输出,并复位DataOutputBuffer和计数。append(DataInputBuffer key, DataInputBuffer value)处理过程也比较类似,就不再分析了。close方法中需要注意的是,我们需要标记文件尾,或者是流结束。目前是通过写2个值为EOF_MARKER的长度来做标记。IFileOutputStream是用于配合Writer的输出流,它会在IFiles的最后添加校验数据。当Writer调用IFileOutputS

57、tream的write操作时,IFileOutputStream计算并保持校验和,流被close的时候,校验结果会写到对应文件的文件尾。实际上存放在磁盘上的文件是一系列的<key-len, value-len, key, value>记录和校验结果。 Reader的相关过程,我们就不再分析了。2009-05-24Hadoop源代码分析(Task的内部类和辅助类)从前面的图中,我们可以发现Task有很多内部类,并拥有大量类成员变量,这些类配合Task完成相关的工作,如下图。 MapOutputFile管理着Mapper的输出文件,它提供了一系列get方法,用于获取

58、Mapper需要的各种文件,这些文件都存放在一个目录下面。我们假设传入MapOutputFile的JobID为job_200707121733_0003,TaskID为task_200707121733_0003_m_000005。MapOutputFile的根为mapred.local.dir/taskTracker/jobcache/jobid/taskid/output在下面的讨论中,我们把上面的路径记为MapOutputFileRoot以上面JogID和TaskID为例,我们有:mapred.local.dir/taskTracker/jobcache/job_200707121733

59、_0003/task_200707121733_0003_m_000005/output需要注意的是,mapred.local.dir可以包含一系列的路径,那么,Hadoop会在这些根路径下找一个满足要求的目录,建立所需的文件。MapOutputFile的方法有两种,结尾带ForWrite和不带ForWrite,带ForWrite用于创建文件,它需要一个文件大小作为参数,用于检查磁盘空间。不带ForWrite用于获取以建立的文件。getOutputFile:文件名为MapOutputFileRoot/file.out; getSpillFile:文件名为MapOutputFileRoot/sp

60、illspillNumber.outgetSpillIndexFile:文件名为MapOutputFileRoot/spillspillNumber.out.index以上四个方法用于Task子类MapTask中;getInputFile:文件名为MapOutputFileRoot/map_mapId.out用于ReduceTask中。我们到使用到他们的地方再介绍相应的应用场景。 介绍完临时文件管理以后,我们来看Task.CombineOutputCollector,它继承自org.apache.hadoop.mapred.OutputCollector,很简单,只是一个Output

61、Collector到IFile.Writer的Adapter,活都让IFile.Writer干了。 ValuesIterator用于从RawKeyValueIterator(Key,Value都是DataInputBuffer,ValuesIterator要求该输入已经排序)中获取符合RawComparator<KEY> comparator的值的迭代器。它在Task中有一个简单子类,CombineValuesIterator。 Task.TaskReporter用于向JobTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告St

62、atusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,利用Hadoop的RPC机制,向JobTracker报告Task执行情况。 FileSystemStatisticUpdater用于记录对文件系统的对/写操作字节数,是个简单的工具类。2009-05-25Hadoop源代码分析(类Task)有了前面的基础,我们可以来分析类Task了。Task是一个虚基类,它有两个子类:MapTask,ReduceTask,分别对应着Map和Reduce。先从成员变量开始:首先是和作业任务相关的信息,包括

63、jobFile,作业的配置文件;taskId,任务ID,从中可以获取作业ID;partition,Job内ID;taskStatus,任务状态。jobCleanup,jobSetup和taskCleanup是三个标志位。接下来是一组和错误回复的变量。我们知道,如果在Task执行过程中出错,很有可能是因为输入有问题,一个常用的策略是在下一次回复性执行过程中,忽略这部分输入,skipRanges,skipping和writeSkipRecs就是用来控制这个行为的。currentRecStartIndex和currentRecIndexIterator配合,可以得到当前的任务输入。conf保存了当前

64、任务的配置(JobConf形式),MapOutputFile上一部分已经介绍了,用于管理临时文件,跟它配合的是lDirAlloc,类型为LocalDirAllocator,是本地文件分配器。jobContext和taskContext保持了Job和Task的上下文。committer定制了和Task生命周期相关的一些特殊处理(也可以看出是上下文)。最后一部分应该是输出outputFormat。和统计/状态监视的成员变量分散在类的各处,如spilledRecordsCounter,taskProgress,counters等,我们就不再介绍了。下面我们开始来分析Task的成员函数,首先是虚方法,Task包含了下面3个虚方法:    public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)        throws IOException, ClassNotFoundException, InterruptedException;执行Task;    public abstract TaskRunner createRunner(TaskT

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论