版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、用 Hadoop 进行分布式并行编程, 第 1 部分基本概念与安装部署文档选项打印本页将此页作为电子邮件发送级别: 初级曹 羽中 (caoyuz), 软件工程师, IBM中国开发中心2008 年 5 月 22 日Hadoop 是一个实现了 MapReduce 计算模型的开源分布式并行编程框架,借助于 Hadoop, 程序员可以轻松地编写分布式并行程序,将其运行于计算机集群上,完成海量数据的计算。本文将介绍 MapReduce 计算模型,分布式并行计算等基本概念,以及 Hadoop 的安装部署和基本运行方法。Hadoop 简介Hadoop 是一个开源的可运行于大规模集群上的分布式并行编
2、程框架,由于分布式存储对于分布式编程来说是必不可少的,这个框架中还包含了一个分布式文件系统 HDFS( Hadoop Distributed File System )。也许到目前为止,Hadoop 还不是那么广为人知,其最新的版本号也仅仅是 0.16,距离 1.0 似乎都还有很长的一段距离,但提及 Hadoop 一脉相承的另外两个开源项目 Nutch 和 Lucene ( 三者的创始人都是 Doug Cutting ),那绝对是大名鼎鼎。Lucene 是一个用 Java 开发的开源高性能全文检索工具包,它不是一个完整的应用程序,而是一套简单易用的 API 。在全世界范围内,已有无数的软件系统
3、,Web 网站基于 Lucene 实现了全文检索功能,后来 Doug Cutting 又开创了第一个开源的 Web 搜索引擎() Nutch, 它在 Lucene 的基础上增加了网络爬虫和一些和 Web 相关的功能,一些解析各类文档格式的插件等,此外,Nutch 中还包含了一个分布式文件系统用于存储数据。从 Nutch 0.8.0 版本之后,Doug Cutting 把 Nutch 中的分布式文件系统以及实现 MapReduce 算法的代码独立出来形成了一个新的开源项 Hadoop。Nutch 也演化为基于 Lucene 全文检索以及 Hadoop 分布式
4、计算平台的一个开源搜索引擎。基于 Hadoop,你可以轻松地编写可处理海量数据的分布式并行程序,并将其运行于由成百上千个结点组成的大规模计算机集群上。从目前的情况来看,Hadoop 注定会有一个辉煌的未来:"云计算"是目前灸手可热的技术名词,全球各大 IT 公司都在投资和推广这种新一代的计算模式,而 Hadoop 又被其中几家主要的公司用作其"云计算"环境中的重要基础软件,如:雅虎正在借助 Hadoop 开源平台的力量对抗 Google, 除了资助 Hadoop 开发团队外,还在开发基于 Hadoop 的开源项目 Pig, 这是一个专注于海量数据集分析的
5、分布式计算程序。Amazon 公司基于 Hadoop 推出了 Amazon S3 ( Amazon Simple Storage Service ),提供可靠,快速,可扩展的网络存储服务,以及一个商用的云计算平台 Amazon EC2 ( Amazon Elastic Compute Cloud )。在 IBM 公司的云计算项目-"蓝云计划"中,Hadoop 也是其中重要的基础软件。Google 正在跟IBM合作,共同推广基于 Hadoop 的云计算。回页首迎接编程方式的变革在摩尔定律的作用下,以前程序员根本不用考虑计算机的性能会跟不上软件的发展,因为约每隔 18 个月,C
6、PU 的主频就会增加一倍,性能也将提升一倍,软件根本不用做任何改变,就可以享受免费的性能提升。然而,由于晶体管电路已经逐渐接近其物理上的性能极限,摩尔定律在 2005 年左右开始失效了,人类再也不能期待单个 CPU 的速度每隔 18 个月就翻一倍,为我们提供越来越快的计算性能。Intel, AMD, IBM 等芯片厂商开始从多核这个角度来挖掘 CPU 的性能潜力,多核时代以及互联网时代的到来,将使软件编程方式发生重大变革,基于多核的多线程并发编程以及基于大规模计算机集群的分布式并行编程是将来软件性能提升的主要途径。许多人认为这种编程方式的重大变化将带来一次软件的并发危机,因为我们传统的软件方式
7、基本上是单指令单数据流的顺序执行,这种顺序执行十分符合人类的思考习惯,却与并发并行编程格格不入。基于集群的分布式并行编程能够让软件与数据同时运行在连成一个网络的许多台计算机上,这里的每一台计算机均可以是一台普通的 PC 机。这样的分布式并行环境的最大优点是可以很容易的通过增加计算机来扩充新的计算结点,并由此获得不可思议的海量计算能力, 同时又具有相当强的容错能力,一批计算结点失效也不会影响计算的正常进行以及结果的正确性。Google 就是这么做的,他们使用了叫做 MapReduce 的并行编程模型进行分布式并行编程,运行在叫做 GFS ( Google File System )的分布式文件系
8、统上,为全球亿万用户提供搜索服务。Hadoop 实现了 Google 的 MapReduce 编程模型,提供了简单易用的编程接口,也提供了它自己的分布式文件系统 HDFS,与 Google 不同的是,Hadoop 是开源的,任何人都可以使用这个框架来进行并行编程。如果说分布式并行编程的难度足以让普通程序员望而生畏的话,开源的 Hadoop 的出现极大的降低了它的门槛,读完本文,你会发现基于 Hadoop 编程非常简单,无须任何并行开发经验,你也可以轻松的开发出分布式的并行程序,并让其令人难以置信地同时运行在数百台机器上,然后在短时间内完成海量数据的计算。你可能会觉得你不可能会拥有数百台机器来运
9、行你的并行程序,而事实上,随着"云计算"的普及,任何人都可以轻松获得这样的海量计算能力。 例如现在 Amazon 公司的云计算平台 Amazon EC2 已经提供了这种按需计算的租用服务,有兴趣的读者可以去了解一下,这篇系列文章的第三部分将有所介绍。掌握一点分布式并行编程的知识对将来的程序员是必不可少的,Hadoop 是如此的简便好用,何不尝试一下呢?也许你已经急不可耐的想试一下基于 Hadoop 的编程是怎么回事了,但毕竟这种编程模型与传统的顺序程序大不相同,掌握一点基础知识才能更好地理解基于 Hadoop 的分布式并行程序是如何编写和运行的。因此本文会先介绍一下 Map
10、Reduce 的计算模型,Hadoop 中的分布式文件系统 HDFS, Hadoop 是如何实现并行计算的,然后才介绍如何安装和部署 Hadoop 框架,以及如何运行 Hadoop 程序。回页首MapReduce 计算模型MapReduce 是 Google 公司的核心计算模型,它将复杂的运行于大规模集群上的并行计算过程高度的抽象到了两个函数,Map 和 Reduce, 这是一个令人惊讶的简单却又威力巨大的模型。适合用 MapReduce 来处理的数据集(或任务)有一个基本要求: 待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。图 1. MapReduce
11、计算流程 图一说明了用 MapReduce 来处理大数据集的过程, 这个 MapReduce 的计算过程简而言之,就是将大数据集分解为成百上千的小数据集,每个(或若干个)数据集分别由集群中的一个结点(一般就是一台普通的计算机)进行处理并生成中间结果,然后这些中间结果又由大量的结点进行合并, 形成最终结果。计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 <key, value> 对转换成另一个或一批 <key, value> 对输出。表一 Map 和 Reduce 函数函数输入输出说明Map<
12、k1, v1>List(<k2,v2>)1. 将小数据集进一步解析成一批 <key,value> 对,输入 Map 函数中进行处理。2. 每一个输入的 <k1,v1> 会输出一批 <k2,v2>。 <k2,v2> 是计算的中间结果。Reduce<k2,List(v2)><k3,v3>输入的中间结果 <k2,List(v2)> 中的 List(v2) 表示是一批属于同一个 k2 的 value以一个计算文本文件中每个单词出现的次数的程序为例,<k1,v1> 可以是 <行在文件中
13、的偏移位置, 文件中的一行>,经 Map 函数映射之后,形成一批中间结果 <单词,出现次数>, 而 Reduce 函数则可以对中间结果进行处理,将相同单词的出现次数进行累加,得到每个单词的总的出现次数。基于 MapReduce 计算模型编写分布式并行程序非常简单,程序员的主要编码工作就是实现 Map 和 Reduce 函数,其它的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通信等,均由 MapReduce 框架(比如 Hadoop )负责处理,程序员完全不用操心。回页首四 集群上的并行计算MapReduce 计算模型非常适合在大量计算机组成的大规
14、模集群上并行运行。图一中的每一个 Map 任务和每一个 Reduce 任务均可以同时运行于一个单独的计算结点上,可想而知其运算效率是很高的,那么这样的并行计算是如何做到的呢?数据分布存储Hadoop 中的分布式文件系统 HDFS 由一个管理结点 ( NameNode )和N个数据结点 ( DataNode )组成,每个结点均是一台普通的计算机。在使用上同我们熟悉的单机上的文件系统非常类似,一样可以建目录,创建,复制,删除文件,查看文件内容等。但其底层实现上是把文件切割成 Block,然后这些 Block 分散地存储于不同的 DataNode 上,每个 Block 还可以复制数份存储于不同的 D
15、ataNode 上,达到容错容灾之目的。NameNode 则是整个 HDFS 的核心,它通过维护一些数据结构,记录了每一个文件被切割成了多少个 Block,这些 Block 可以从哪些 DataNode 中获得,各个 DataNode 的状态等重要信息。如果你想了解更多的关于 HDFS 的信息,可进一步阅读参考资料: The Hadoop Distributed File System:Architecture and Design分布式并行计算Hadoop 中有一个作为主控的 JobTracker,用于调度和管理其它的 TaskTracker, JobTracker 可以运行于集群
16、中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上,即 DataNode 既是数据存储结点,也是计算结点。 JobTracker 将 Map 任务和 Reduce 任务分发给空闲的 TaskTracker, 让这些任务并行运行,并负责监控任务的运行情况。如果某一个 TaskTracker 出故障了,JobTracker 会将其负责的任务转交给另一个空闲的 TaskTracker 重新运行。本地计算数据存储在哪一台计算机上,就由这台计算机进行这部分数据的计算,这样可以减少数据在网络上的传输,降低对网络带宽的需求。在 Hadoop 这样的基于集群的分布式并行系
17、统中,计算结点可以很方便地扩充,而因它所能够提供的计算能力近乎是无限的,但是由是数据需要在不同的计算机之间流动,故网络带宽变成了瓶颈,是非常宝贵的,“本地计算”是最有效的一种节约网络带宽的手段,业界把这形容为“移动计算比移动数据更经济”。图 2. 分布存储与并行计算 任务粒度把原始大数据集切割成小数据集时,通常让小数据集小于或等于 HDFS 中一个 Block 的大小(缺省是 64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。有 M 个小数据集待处理,就启动 M 个 Map 任务,注意这 M 个 Map 任务分布于 N 台计算机上并行运行,Reduce 任务的数量 R
18、 则可由用户指定。Partition把 Map 任务输出的中间结果按 key 的范围划分成 R 份( R 是预先定义的 Reduce 任务的个数),划分时通常使用 hash 函数如: hash(key) mod R,这样可以保证某一段范围内的 key,一定是由一个 Reduce 任务来处理,可以简化 Reduce 的过程。Combine在 partition 之前,还可以对中间结果先做 combine,即将中间结果中有相同 key的 <key, value> 对合并成一对。combine 的过程与 Reduce 的过程类似,很多情况下就可以直接使用 Reduce 函数,但 comb
19、ine 是作为 Map 任务的一部分,在执行完 Map 函数后紧接着执行的。Combine 能够减少中间结果中 <key, value> 对的数目,从而减少网络流量。Reduce 任务从 Map 任务结点取中间结果Map 任务的中间结果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盘。中间结果文件的位置会通知主控 JobTracker, JobTracker 再通知 Reduce 任务到哪一个 DataNode 上去取中间结果。注意所有的 Map 任务产生中间结果均按其 Key 用同一个 Hash 函数划分成了 R 份,R 个 Reduce 任务各自负责
20、一段 Key 区间。每个 Reduce 需要向许多个 Map 任务结点取得落在其负责的 Key 区间内的中间结果,然后执行 Reduce 函数,形成一个最终的结果文件。任务管道有 R 个 Reduce 任务,就会有 R 个最终结果,很多情况下这 R 个最终结果并不需要合并成一个最终结果。因为这 R 个最终结果又可以做为另一个计算任务的输入,开始另一个并行计算任务。回页首五 Hadoop 初体验Hadoop 支持 Linux 及 Windows 操作系统, 但其官方网站声明 Hadoop 的分布式操作在 Windows 上未做严格测试,建议只把 Windows 作为 Hadoop 的开发平台。在
21、 Windows 环境上的安装步骤如下( Linux 平台类似,且更简单一些):(1)在 Windows 下,需要先安装 Cgywin, 安装 Cgywin 时注意一定要选择安装 openssh (在 Net category )。安装完成之后,把 Cgywin 的安装目录如 c:cygwinbin 加到系统环境变量 PATH 中,这是因为运行 Hadoop 要执行一些 linux 环境下的脚本和命令。(2)安装 Java 1.5.x,并将 JAVA_HOME 环境变量设置为 Java 的安装根目录如 C:Program FilesJavajdk1.5.0_01。(3)到 Hadoop 官方网
22、站 下载Hadoop Core, 最新的稳定版本是 0.16.0. 将下载后的安装包解压到一个目录,本文假定解压到 c:hadoop-0.16.0。4)修改 conf/hadoop-env.sh 文件,在其中设置 JAVA_HOME 环境变量: export JAVA_HOME="C:Program FilesJavajdk1.5.0_01” (因为路径中 Program Files 中间有空格,一定要用双引号将路径引起来)至此,一切就绪,可以运行 Hadoop 了。以下的运行过程,需要启动 cygwin, 进入模拟 Linux
23、 环境。在下载的 Hadoop Core 包中,带有几个示例程序并且已经打包成了 hadoop-0.16.0-examples.jar。其中有一个 WordCount 程序,功能是统计一批文本文件中各个单词出现的次数,我们先来看看怎么运行这个程序。Hadoop 共有三种运行模式: 单机(非分布式)模式,伪分布式运行模式,分布式运行模式,其中前两种运行模式体现不了 Hadoop 分布式计算的优势,并没有什么实际意义,但对程序的测试及调试很有帮助,我们先从这两种模式入手,了解基于 Hadoop 的分布式并行程序是如何编写和运行的。单机(非分布式)模式这种模式在一台单机上运行,没有分布式文件系统,而
24、是直接读写本地操作系统的文件系统。代码清单1 $ cd /cygdrive/c/hadoop-0.16.0$ mkdir test-in $ cd test-in#在 test-in 目录下创建两个文本文件, WordCount 程序将统计其中各个单词出现次数$ echo "hello world bye world" >file1.txt $ echo "hello hadoop goodbye hadoop" >file2.txt$ cd .$ bin/hadoop jar hadoop-0.16.0-examples.jar wordc
25、ount test-in test-out#执行完毕,下面查看执行结果:$ cd test-out$ cat part-00000bye 1goodbye 1hadoop 2hello 2world 2注意事项:运行 bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out 时,务必注意第一个参数是 jar, 不是 -jar, 当你用 -jar 时,不会告诉你是参数错了,报告出来的错误信息是:Exception in thread "main" java.lang.NoClassDefFound
26、Error: org/apache/hadoop/util/ProgramDriver, 笔者当时以为是 classpath 的设置问题,浪费了不少时间。通过分析 bin/hadoop 脚本可知,-jar 并不是 bin/hadoop 脚本定义的参数,此脚本会把 -jar 作为 Java 的参数,Java 的-jar 参数表示执行一个 Jar 文件(这个 Jar 文件必须是一个可执行的 Jar,即在 MANIFEST 中定义了主类), 此时外部定义的 classpath 是不起作用的,因而会抛出 java.lang.NoClassDefFoundError 异常。而 jar 是 bin/had
27、oop 脚本定义的参数,会调用 Hadoop 自己的一个工具类 RunJar,这个工具类也能够执行一个 Jar 文件,并且外部定义的 classpath 有效。伪分布式运行模式这种模式也是在一台单机上运行,但用不同的 Java 进程模仿分布式运行中的各类结点 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),请注意分布式运行中的这几个结点的区别:从分布式存储的角度来说,集群中的结点由一个 NameNode 和若干个 DataNode 组成, 另有一个 Secondary NameNode 作为 NameNod
28、e 的备份。 从分布式应用的角度来说,集群中的结点由一个 JobTracker 和若干个 TaskTracker 组成,JobTracker 负责任务的调度,TaskTracker 负责并行执行任务。TaskTracker 必须运行在 DataNode 上,这样便于数据的本地计算。JobTracker 和 NameNode 则无须在同一台机器上。(1) 按代码清单2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的参数,你可以通过读此文件了解 Hadoop 中有哪些参数可供配置,但不要修改此文件。可通过修改 conf
29、/hadoop-site.xml 改变缺省参数值,此文件中设置的参数值会覆盖 conf/hadoop-default.xml 的同名参数。代码清单 2 <configuration> <property> <name></name> <value>localhost:9000</value> </property> <property> <name>mapred.job.tracker</name> <value>localhost:
30、9001</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>参数 指定 NameNode 的 IP 地址和端口号。缺省值是 file:/, 表示使用本地文件系统, 用于单机非分布式模式。此处我们指定使用运行于本机 localhost 上的 NameNode。参数 mapred.job.tracker
31、指定 JobTracker 的 IP 地址和端口号。缺省值是 local, 表示在本地同一 Java 进程内执行 JobTracker 和 TaskTracker, 用于单机非分布式模式。此处我们指定使用运行于本机 localhost 上的 JobTracker ( 用一个单独的 Java 进程做 JobTracker )。参数 dfs.replication 指定 HDFS 中每个 Block 被复制的次数,起数据冗余备份的作用。 在典型的生产系统中,这个数常常设置为3。(2)配置 SSH,如代码清单3所示:代码清单 3 $ ssh-keygen -t dsa -P '' -
32、f /.ssh/id_dsa $ cat /.ssh/id_dsa.pub >> /.ssh/authorized_keys配置完后,执行一下 ssh localhost, 确认你的机器可以用 SSH 连接,并且连接时不需要手工输入密码。(3)格式化一个新的分布式文件系统, 如代码清单4所示:代码清单 4 $ cd /cygdrive/c/hadoop-0.16.0$ bin/hadoop namenode format (4) 启动 hadoop 进程, 如代码清单5所示。控制台上的输出信息应该显示启动了 namenode, datanode, secondary namenod
33、e, jobtracker, tasktracker。启动完成之后,通过 ps ef 应该可以看到启动了5个新的 java 进程。代码清单 5 $ bin/start-all.sh $ ps ef(5) 运行 wordcount 应用, 如代码清单6所示:代码清单 6 $ bin/hadoop dfs -put ./test-in input #将本地文件系统上的 ./test-in 目录拷到 HDFS 的根目录上,目录名改为 input#执行 bin/hadoop dfs help 可以学习各种 HDFS 命令的使用。$ bin/hadoop jar hadoop-0.16.0-exampl
34、es.jar wordcount input output#查看执行结果:#将文件从 HDFS 拷到本地文件系统中再查看:$ bin/hadoop dfs -get output output $ cat output/*#也可以直接查看$ bin/hadoop dfs -cat output/*$ bin/stop-all.sh #停止 hadoop 进程故障诊断(1) 执行 $ bin/start-all.sh 启动 Hadoop 进程后,会启动5个 java 进程, 同时会在 /tmp 目录下创建五个 pid 文件记录这些进程 ID 号。通过这五个文件,可以得知 namenode, da
35、tanode, secondary namenode, jobtracker, tasktracker 分别对应于哪一个 Java 进程。当你觉得 Hadoop 工作不正常时,可以首先查看这5个 java 进程是否在正常运行。(2) 使用 web 接口。访问 http:/localhost:50030 可以查看 JobTracker 的运行状态。访问 http:/localhost:50060 可以查看 TaskTracker 的运行状态。访问 http:/localhost:50070 可以查看 NameNode 以及整个分布式文件系统的状态,浏览分布式文件系统中的文件以及 log 等。(3
36、) 查看 $HADOOP_HOME/logs 目录下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一个对应的 log 文件,每一次运行的计算任务也有对应用 log 文件。分析这些 log 文件有助于找到故障原因。回页首结束语现在,你已经了解了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且有了一个可以运行的 Hadoop 环境,运行了一个基于 Hadoop 的并行程序。在下一篇文章中,你将了解到如何针对一个具体的计算任务,基于 Hadoop 编写自
37、己的分布式并行程序并将其部署运行等内容。声明:本文仅代表作者个人之观点,不代表 IBM 公司之观点。用 Hadoop 进行分布式并行编程, 第 2 部分程序实例与分析文档选项打印本页将此页作为电子邮件发送样例代码级别: 初级曹 羽中 (caoyuz), 软件工程师, IBM中国开发中心2008 年 5 月 22 日Hadoop 是一个实现了 MapReduce 计算模型的开源分布式并行编程框架,借助于 Hadoop, 程序员可以轻松地编写分布式并行程序,将其运行于计算机集群上,完成海量数据的计算。在本文中,详细介绍了如何针对一个具体的并行计算任务,基于 Hadoop 编写程序,如何使
38、用 IBM MapReduce Tools 在 Eclipse 环境中编译并运行 Hadoop 程序。前言在上一篇文章:“用 Hadoop 进行分布式并行编程 第一部分 基本概念与安装部署”中,介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且详细介绍了如何安装 Hadoop,如何运行基于 Hadoop 的并行程序。在本文中,将针对一个具体的计算任务,介绍如何基于 Hadoop 编写并行程序,如何使用 IBM 开发的 Hadoop Eclipse plugin 在 Eclipse 环境中编译并运行程序。回页首分析 WordCount 程序我们先来
39、看看 Hadoop 自带的示例程序 WordCount,这个程序用于统计一批文本文件中单词出现的频率,完整的代码可在下载的 Hadoop 安装包中得到(在 src/examples 目录中)。1.实现Map类见代码清单1。这个类实现 Mapper 接口中的 map 方法,输入参数中的 value 是文本文件中的一行,利用 StringTokenizer 将这个字符串拆成单词,然后将输出结果 <单词,1> 写入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 负责收集 Mapper
40、和 Reducer 的输出数据,实现 map 函数和 reduce 函数时,只需要简单地将其输出的 <key,value> 对往 OutputCollector 中一丢即可,剩余的事框架自会帮你处理好。代码中 LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为 long, int, String 的替代品。Reporter 则可用于报告整个应用的运行进度,本例中未使用。代码清单1 public static class MapCla
41、ss extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws
42、IOException String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.实现 Reduce 类见代码清单 2。这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map 任务输出的中间结果,values 是一个 Iterator, 遍历这个 Iterator, 就可以得
43、到属于同一个 key 的所有 value. 此处,key 是一个单词,value 是词频。只需要将所有的 value 相加,就可以得到这个单词的总的出现次数。代码清单 2 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable&
44、gt; output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().get(); output.collect(key, new IntWritable(sum); 3.运行 Job在 Hadoop 中一次计算任务称之为一个 job, 可以通过一个 JobConf 对象设置如何运行这个 job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable, 指定使用代码清单1中实现的 MapClass 作为 Mapp
45、er 类,使用代码清单2中实现的 Reduce 作为 Reducer 类和 Combiner 类, 任务的输入路径和输出路径由命令行参数指定,这样 job 运行时会处理输入路径下的所有文件,并将计算结果写到输出路径下。然后将 JobConf 对象作为参数,调用 JobClient 的 runJob, 开始执行这个计算任务。至于 main 方法中使用的 ToolRunner 是一个运行 MapReduce 任务的辅助工具类,依样画葫芦用之即可。代码清单 3 public int run(String args) throws Exception JobConf conf = new JobCon
46、f(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); con
47、f.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0; public static void main(String args) throws Exception if(args.length != 2) System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); int res = ToolRunner.run(new Configuration(), new Wo
48、rdCount(), args); System.exit(res); 以上就是 WordCount 程序的全部细节,简单到让人吃惊,您都不敢相信就这么几行代码就可以分布式运行于大规模集群上,并行处理海量数据集。4. 通过 JobConf 定制计算任务通过上文所述的 JobConf 对象,程序员可以设定各种参数,定制如何完成一个计算任务。这些参数很多情况下就是一个 java 接口,通过注入这些接口的特定实现,可以定义一个计算任务( job )的全部细节。了解这些参数及其缺省设置,您才能在编写自己的并行计算程序时做到轻车熟路,游刃有余,明白哪些类是需要自己实现的,哪些类用 Hadoop 的缺省实
49、现即可。表一是对 JobConf 对象中可以设置的一些重要参数的总结和说明,表中第一列中的参数在 JobConf 中均会有相应的 get/set 方法,对程序员来说,只有在表中第三列中的缺省值无法满足您的需求时,才需要调用这些 set 方法,设定合适的参数值,实现自己的计算目的。针对表格中第一列中的接口,除了第三列的缺省实现之外,Hadoop 通常还会有一些其它的实现,我在表格第四列中列出了部分,您可以查阅 Hadoop 的 API 文档或源代码获得更详细的信息,在很多的情况下,您都不用实现自己的 Mapper 和 Reducer, 直接使用 Hadoop 自带的一些实现即可。表一 JobCo
50、nf 常用可定制参数参数作用缺省值其它实现InputFormat将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现, 将一个 InputSplit 解析成 <key,value> 对提供给 map 函数。TextInputFormat(针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value> 对,key 是行在文件中的位置,v
51、alue 是文件中的一行)SequenceFileInputFormatOutputFormat提供一个 RecordWriter 的实现,负责输出最终结果TextOutputFormat(用 LineRecordWriter 将最终结果写成纯文件文件,每个 <key,value> 对一行,key 和 value 之间用 tab 分隔)SequenceFileOutputFormatOutputKeyClass输出的最终结果中 key 的类型LongWritableOutputValueClass输出的最终结果中 value 的类型TextMapperClassMapper 类,实
52、现 map 函数,完成输入的 <key,value> 到中间结果的映射IdentityMapper(将输入的 <key,value> 原封不动的输出为中间结果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass实现 combine 函数,将中间结果中的重复 key 做合并null(不对中间结果中的重复 key 做合并)ReducerClassReducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果IdentityReducer(将中间结果直接输出为最终结果)AccumulatingRedu
53、cer, LongSumReducerInputPath设定 job 的输入目录, job 运行时会处理输入目录下的所有文件nullOutputPath设定 job 的输出目录,job 的最终结果会写入输出目录下nullMapOutputKeyClass设定 map 函数输出的中间结果中 key 的类型如果用户没有设定的话,使用 OutputKeyClassMapOutputValueClass设定 map 函数输出的中间结果中 value 的类型如果用户没有设定的话,使用 OutputValuesClassOutputKeyComparator对结果中的 key 进行排序时的使用的比较器Wr
54、itableComparablePartitionerClass对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。HashPartitioner(使用 Hash 函数做 partition)KeyFieldBasedPartitioner PipesPartitioner回页首改进的 WordCount 程序现在你对 Hadoop 并行程序的细节已经有了比较深入的了解,我们来把 WordCount 程序改进一下,目标: (1)原 WordCount 程序仅按空格切分单词,导致各类标点符号与单词混杂在一起,改进后的程序应该能够正确的
55、切出单词,并且单词不要区分大小写。(2)在最终结果中,按单词出现频率的降序进行排序。1.修改 Mapper 类,实现目标(1)实现很简单,见代码清单4中的注释。代码清单 4 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="w" /正则表达式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toString().toLowerCase(); /全部转为小写字母 li
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年水利项目施工合作合同范本版B版
- 游戏历史课程设计思路
- 2024年度摄影师与摄影器材租赁公司合作合同3篇
- 2024年盆景租赁服务与植物科普教育合同2篇
- 护理人体解剖课程设计
- 2024年水稻国际市场采购代理合同3篇
- 2024年污染物在线监测管理与维护服务合同
- 2024年松树种植基地松树果实收购合同3篇
- 电气课程设计电路
- 2024年度休闲垂钓中心鱼池场地租赁经营合同3篇
- 2024年云南省昆明滇中新区公开招聘20人历年高频500题难、易错点模拟试题附带答案详解
- TCECA-G 0299-2024 会展活动碳中和实施指南
- 《中国心力衰竭诊断和治疗指南2024》解读
- 2025年全年日历含农历(1月-12月)
- 2024-2030年中国塑料光纤(POF)行业市场发展趋势与前景展望战略分析报告
- 顶管施工危险源辨识及风险评价表
- 国家开放大学《建筑工程项目管理》形成性考核1-4参考答案
- 2024年统编版新教材语文小学一年级上册第八单元检测题附答案
- 多学科联合诊疗(MDT)管理方案
- 2024国家开放大学电大专科《市场营销学》期末试题及答案
- DL∕T 1340-2014 火力发电厂分散控制系统故障应急处理导则
评论
0/150
提交评论