统一分布式计算框架Bigflow介绍_第1页
统一分布式计算框架Bigflow介绍_第2页
统一分布式计算框架Bigflow介绍_第3页
统一分布式计算框架Bigflow介绍_第4页
统一分布式计算框架Bigflow介绍_第5页
已阅读5页,还剩32页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、统一分布式计算框架Bigflow介绍Agenda背景基本抽象/接口统一离线/实时计算优化应用实践背景2011Baidu Bigflow立项201020032008引入Apache Hadoop2009分布式网页库,存储 网页规模超过1000亿大规模机器学习平台实时计算系统支持毫秒级 时延,准实时计算系统严 格不丢不重,时效性可达 到20秒2012新一代分布式计算系 统上线,单集群规模 达到100002013全球首个ARM构架服务器规模化应用2014引入Apache Spark分布式搜索系统2015Baidu Bigflow对内 发布背景-多引擎并存BatchMapReduce AbaciIte

2、rativeSparkRealTimeDStreamMini-BatchTaskManagerSparkStreamingMLELF多个计算平台开发慢 | 使用繁 | 维护难一套逻辑,不断重写:一套逻辑,同时维护:背景-跨引擎成本高MapReduceSpark新的计算引擎(100 x faster than.)离线计算实时计算恢复故障数据、提升准确性提升时效性背景-升级困难Baidu MapReduce进化:用户无感知:流式Shuffle服务Mapper/Reducer进程复用 Native Streaming用户感知,推广难度大:DAG拓扑支持多输入多输出与Broadcast支持并发动态调节

3、背景-目标易写、易维护、易测试计算逻辑与引擎解耦批量/流式计算统一高效背景Any problem in computer science can be solved by anther layer of indirection. .except for the problem of too many layers of indirection.Agenda背景基本抽象/接口统一离线/实时计算优化应用实践抽象Nestable Datasetp = base.Pipeline.create(ENGINE)words = p.read(input.TextFile(/a/b/c) .flat_map

4、(lambda line: line.split(t)per_word_cnt = words .group_by(lambda word: word) .apply_values(count) .flatten()p.write(word_cnt, output.TextFile(/a/b/e) p.run()Pipeline: 入口 / 分布式作业的抽象MRPipeline/SparkPipeline.Dataset: 分布式数据集抽象有序/无序 by Sort单元素/任意数量元素 by Aggregations有穷/无穷 from Source/Window有/无Schema扁平/嵌套

5、by GroupingsTransformations: 变换全部惰式计算SideInput抽象Nestable DatasetDataSet嵌套分组DataSetColor, DataSet grouped = dataset.apply(GroupBy, getColor)DataSetColor, DataSetShape, DataSet nested = grouped.apply_values(GroupBy, getShape)接口p = base.Pipeline.create(ENGINE)visitors = p.read(input.TextFile(/a/b/c) .f

6、lat_map(lambda line: line.split(t)def cnt_distinct(visitors): param visitors: DataSet return visitors .distinct() .count()num = visitors.apply(cnt_distinct)p.write(num, output.TextFile(/a/b/d) p.run()total_visitors = .def uv_counting(total_visitors): param page_visitors: DataSetPair return total_vis

7、itors .group_by(lambda (visitors, page): page) .apply_values(cnt_distinct).flatten()pv_result = total_visitors.apply(uv_counting)p.write(num, output.TextFile(/a/b/d) p.run()变换TransformationUDFInput = outputmap(p, f)f: (V) = TV = Tfilter(p, f)f: (V) = boolV = Vflat_map(p, f)f: (V) = IterableTV = T /

8、V = Tgroup_by_key(p)-(K, V) = K: Vgroup_by(p, f)f: (V) = KV = K: Vflatten(p)-K: V = (K, V)reduce(p, f)f: (V, V)=VV = Vaggregate(p, v, f1, f2)v: T or () = T f1: (T, V) = Tf2: (T, T) = TV = Tjoin(p1, p2)-(K, V1),(K, V2) = (K,(V1,V2)cogroup(p1, p2)-(K, V1), (K, V2) = K: (V1, V2)cartesian(p1, p2)-V1, V2

9、 = (V1, V2)union(p1, p2)-V, V = Vsum(p)-V = Vcount(p)-V = Longtake(p, num)-V = Vsort(p, f)f: f(V) = KV = VAgenda背景基本抽象/接口统一离线/实时计算优化应用实践抽象Nestable DatasetDataSetOdevity, DataSet grouped = dataset.apply(GroupBy, modByTwo)无穷DataSet分组抽象Nestable DatasetDataSetWindow, DataSet windowed = dataset.apply(Win

10、dowInto, FIXED_WINDOW, getTime)无穷DataSet划分窗口total_visitors = .def uv_counting(total_visitors): return total_visitors .group_by(lambda (visitors, page): page) .apply_values(cnt_distinct).flatten()pv_result = total_visitors.apply(uv_counting)p.write(num, output.TextFile(/a/b/d) p.run()离线/实时def uv_coun

11、ting(total_visitors):return total_visitors .group_by(lambda (visitors, page): page) .apply_values(cnt_distinct).flatten()raw_words = p.read(input.Kafka(xxx) .flat_map(lambda line: line.split()num = words .window_into(time_extractor, FIXED_WINDOW) .apply_values(uv_counting) .flatten() p.write(num, .)

12、p.run()Agenda背景基本抽象/接口统一离线/实时计算优化应用实践优化-基本框架PlannerBigflowSpark PlannerDCEPlannerTMPlannerRuntimeCoreAPIC+JavaSpark RuntimeRuntimeTMRuntimeLocal RuntimePytho1nLogica2lPlan3D4CE优化-基本框架 ProcessNodeFlatMap ProcessNodeCount SinkNodeWrite Output ProcessNodeCount SinkNodeWrite OutputScope(Global)Scope(Key

13、) KeyReaderReadKeyScope(Load) LoadNodeRead inputScope(Global)Scope(Load)Scope(Key) ProcessNode SinkNodeCountWrite Output LoadNodeProcessNode KeyReaderProcessNode SinkNodeRead inputFlatMapRead KeyCountWrite OutputNode: “计算” (DAG)Scope: “分布式” (Tree)优化-示例(逻辑计划)dataset .map(fn) .group_by(_) .flatten().P

14、rocessNodeMap(IG)Scope KeyReaderReadKey .dataset .map(fn).ProcessNodeMap(IG) .dataset .group_by(_) .apply_values(map, fn) .flatten() .Scope KeyReaderReadKey ProcessNodeMap(IG) .优化-示例(物理计划) KeyReaderReadPageScope(page) .Scope(visitor)KeyReaderReadVisitor ProcessNodeTake(Partial) ProcessNodeTake Proce

15、ssNodeCount(Partial)ProcessNodeSum .dataset .group_by(lambda (visitors, page) : page) .apply_values(cnt_distinct) .flatten()dataset .group_by(lambda (visitors, page): page) .apply_values(lambda visitors: visitors .group_by(visitor) .apply_values(take, 1).flatten() .count().flatten()优化-示例(物理计划) KeyRe

16、aderReadPage KeyReaderReadVisitor . .Scope(visitor) ProcessNodeTake(Partial)ProcessNodeTakeProcessNodeCount(Partial)ProcessNodeSumMapperReducerScope(page) KeyReaderReadVisitor . .Scope(page) Scope(visitor) ProcessNodeTake(Partial)ProcessNodeTakeProcessNodeCount(Partial)ProcessNodeSumMapperReducer Ke

17、yReaderScope(page)ReadPageShuffleScope前移任意ShufflePartitionBy: page优化-示例(物理计划)KeyReaderReadVisitor .Scope(page) .ProcessNodeSumMapperReducer ProcessNodeScope(visitor)Take(Partial)ProcessNodeTakeProcessNodeCount(Partial) KeyReaderReadPageScope(page)Scope(visitor)ShuffleScope前移PartitionBy: page SortBy:

18、 page, visitor KeyReaderReadVisitor . .Scope(page) Scope(visitor) ProcessNodeTake(Partial) ProcessNodeTake ProcessNodeCount(Partial) ProcessNodeSumMapperReducer KeyReaderScope(page)ReadPagePartitionBy: page优化-示例(物理计划)KeyReaderReadVisitor .Scope(page) .ProcessNodeTake(Partial)ProcessNodeTakeProcessNo

19、deCount(Partial)ProcessNodeSumMapperReducerScope(visitor) KeyReaderReadPageScope(page)Scope(visitor)Partial算子 前移PartitionBy: page SortBy: page, visitorKeyReaderReadVisitor .Scope(page) .ProcessNodeSumMapperReducer ProcessNodeScope(visitor)Take(Partial)ProcessNodeTakeProcessNodeCount(Partial) KeyRead

20、erReadPageScope(page)Scope(visitor)PartitionBy: page SortBy: page, visitor优化-示例(物理计划)KeyReaderReadVisitor .MergeShuffle .ProcessNodeTake(Partial)ProcessNodeTakeProcessNodeCount(Partial)ProcessNodeSumMapperReducerMergeShuffle KeyReaderReadPageLocalShuffleLocalShuffleShuffle类型 判定PartitionBy: page Sort

21、By: page, visitorKeyReaderReadVisitor .Scope(page) .ProcessNodeTake(Partial)ProcessNodeTakeProcessNodeCount(Partial)ProcessNodeSumMapperReducerScope(visitor) KeyReaderReadPageScope(page)Scope(visitor)PartitionBy: page SortBy: page, visitor优化-示例(物理计划) .MergeShuffle .Take(Partial)ProcessNodeTakeProces

22、sNodeCount(Partial)ProcessNodeSumMapperReducerMergeShuffle KeyReaderReadPageKeyReaderReadVisitor ProcessNodeLocalShuffle(page+visitor) .MergeShuffleProcessNodeTakeProcessNodeCount(Partial)ProcessNodeSumReducerMergeShuffleLocalShuffle合并KeyReaderReadVisitor . ProcessNodeTake(Partial)Mapper KeyReaderRe

23、adPageLocalShuffleLocalShufflePartitionBy: page SortBy: page, visitorPartitionBy: userSortBy: user, page优化-示例(物理计划) .MergeShuffle . ProcessNodeTake ProcessNodeCount(Partial)ProcessNodeSumMapper KeyReaderReadPageKeyReaderReadVisitor ProcessNodeLocalShuffle(page+visitor)Take(Partial)PartitionBy: pageS

24、ortBy: page, visitorReducerMergeShuffledataset .group_by(lambda (visitors, page) : page) .apply_values(cnt_distinct) .flatten()dataset .group_by(lambda (visitors, page): page) .apply_values(lambda visitors: visitors .group_by(visitor) .apply_values(take, 1).flatten() .count().flatten()优化-示例(结合引擎)linksranksranksranksranksranksranksranksranks同数据Shuffle合并linksShuffle示例: PageRank一次写多次读Agenda背

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论