MapReduce技术原理与实践_第1页
MapReduce技术原理与实践_第2页
MapReduce技术原理与实践_第3页
MapReduce技术原理与实践_第4页
MapReduce技术原理与实践_第5页
已阅读5页,还剩10页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、MapReduce技术原理与实践技术创新,变革未来2提纲MapReduce 原理架构12MapReduce编程实战3MapReduce是一种编程模型, 用于大规模数据集(大于 1TB)的并行运算。概念“Map(映射)”和“Reduce(归约)”,和他 们的主要思想,都是从函数 式编程语言里借来的,还有 从矢量编程语言里借来的特 性。他极大地方便了编程人 员在不会分布式并行编程的 情况下,将自己的程序运行 在分布式系统上MapReduce 简介Google MapReduce 架构设计师 Jeffrey Dean4原理:利用一个输 入Key/Value pair集合 来产生一个输出的 Key/V

2、alue pair集合Map函数:接受一 个输入的Key/Valuepair值,然后产生一 个中间Key/Value pair 值的集合。Reduce函数:接受 一个中间Key值和相 关的一个Value值的集合,合并这些Value值。MapReduce框架5MapReduce组成JobtrackerMaster使用者发起作业指派任务到Tasktrackers作业分配、错误处理TasktrackersWorkers运行Map、Reduce任务管理存储、汇报 运算结果6MapReduce过程Mappershuffle 和sortReducer7流程INPUT阶段:根据输入文件和MAP数目系统产生相

3、应的文件块MAP阶段:每个MAP根据用户定义把输入的KEY-VALUE对转化成新的 KEY-VALUE对SHUFFLE&SORT阶段:系统对MAP输出进行发送并排序到相应的REDUCE上REDUCE阶段:把所有相同KEY的记录合并成一个KEY-VALUE对OUTPUT阶段:把输出结果写到DFS应用程序指定MAP模块REDUCE模块输入文件各个阶段都是分布式运算框架完成集群的计划、调度功能框架完成容错的功能MapReduce 整体流程8提纲MapReduce 原理架构12MapReduce编程实战9案例:根据日志计算用户页面停留时间和访问次数页面停留时间:通过被访问网页的时间戳来计算页面停留时间

4、的。访问次数:网站的所有访问者发起的具体会话次数。MapReduce 编程实战:案例及分析原始数据格式IP 时间稿件ID 稿件库ID需要按照IP统计同一个IP下的数据按照 时间戳排序在同一个IP下,通过排 好序的序列计算稿件的 间隔时间并统计访问次 数10numPartitions;/ NewKey.java 实现比较方法public int compareTo(NewKey tp)int cmp = pareTo(tp.first); if (cmp != 0)return cmp;return pareTo(tp.second);/ NewPartitioner 分区函数类。根据first

5、确定Partition。public int getPartition(NewKey key, Text value, int numPartitions)return (key.first.hashCode() & Integer.MAX_VALUE) %/ NewGroupComparator 分组函数类。只要first相同就属于同一个组。public int compare(WritableComparable w1, WritableComparable w2)NewKey m1 = (NewKey) w1; NewKey m2 = (NewKey) w2;return pareTo(

6、m2.first);MapReduce编程实战:Secondary SortSecondary Sort 实际上就是一种对Value进行二次排序,然后按key的特定部分进 行聚合的方法,这里用到了一个组合Key的概念,就是把Key与要排序的Value组 合在一起,生成一个新的Key值在本例中,需要把(IP,timestamp)组合在一起,形成新的Key11/ Mapper 变量初始化protected void setup(Context context) throws IOException, InterruptedExceptionip = new LongWritable(); time

7、stamp = new LongWritable(); text = new Text();myKey = new NewKey();public void map(Object key, Text value, Context context) throws IOException, InterruptedExceptionString line = value.toString();String lineArray = line.split(IN_SEPARATOR); if (lineArray.length = 4)SimpleDateFormat sdf = new SimpleDa

8、teFormat(DATE_FORMAT); long ts = 0;ts = sdf.parse(lineArray1).getTime(); ip.set(ipToLong(lineArray0); timestamp.set(ts);myKey.set(ip, timestamp);StringBuffer sb = new StringBuffer(String.valueOf(ts) + OUT_SEPARATOR); sb.append(lineArray2 + DOC_SEPARATOR + lineArray3); text.set(sb.toString();context.

9、write(myKey, text);MapReduce编程实战:Mapper12public void reduce(NewKey key, Iterable values, Context context) throws IOException, InterruptedExceptionfor (Text val : values) if (start = 0) time = Long.valueOf(val.toString().split(t)0); start = time;doc = val.toString().split(t)1; continue;long curTime =

10、 Long.valueOf(val.toString().split(t)0); String curDoc = val.toString().split(t)1;if (curTime - time = SESSEION_TIMEOUT) timeOnUrl = (curTime - time) / 1000;rsVal = String.valueOf(time) + OUT_SEPARATOR + doc + OUT_SEPARATOR + String.valueOf(visit)+ OUT_SEPARATOR + String.valueOf(timeOnUrl);result.se

11、t(rsVal); context.write(key.first, result);else rsVal = String.valueOf(time) + OUT_SEPARATOR + doc + OUT_SEPARATOR + String.valueOf(visit)+ OUT_SEPARATOR + String.valueOf(1); result.set(rsVal);context.write(key.first, result); visit+;time = curTime; doc = curDoc;rsVal = String.valueOf(time) + OUT_SE

12、PARATOR + doc + OUT_SEPARATOR + String.valueOf(visit) + OUT_SEPARATOR + 1;result.set(rsVal); context.write(key.first, result);MapReduce编程实战:Reducer内部逻辑13public static void main(String args) throws ExceptionConfiguration conf = new Configuration();String otherArgs = new GenericOptionsParser(conf, arg

13、s).getRemainingArgs(); Job job = new Job(conf, XH_LOG_ANALYZE_PRE_PROCESS); job.setJarByClass(LogProcessMapReduce.class); job.setMapperClass(LogProcessMapper.class);/ 设 置 Mapper job.setReducerClass(LogProcessReducer.class);/ 设 置 Reducer job.setPartitionerClass(NewPartitioner.class);/ 设 置 自 定 义 Parti

14、tioner job.setGroupingComparatorClass(NewGroupComparator.class);/ 设置自定义GroupComparatorjob.setMapOutputKeyClass(NewKey.class);/ 设置自定义Map OutputKey 类型 job.setMapOutputValueClass(Text.class);/ 设置Map OutputValue 类型 job.setOutputKeyClass(LongWritable.class);/ 设置Reduce OutputKey 类型 job.setOutputValueClass(Text.class);/ 设置Reduce OutputValue 类型 FileInputFormat.addInputPath(job, new

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论