大数据导论-思维、技术与应用-第11章-分布式图计算框架SPARK-GRAPHX课件_第1页
大数据导论-思维、技术与应用-第11章-分布式图计算框架SPARK-GRAPHX课件_第2页
大数据导论-思维、技术与应用-第11章-分布式图计算框架SPARK-GRAPHX课件_第3页
大数据导论-思维、技术与应用-第11章-分布式图计算框架SPARK-GRAPHX课件_第4页
大数据导论-思维、技术与应用-第11章-分布式图计算框架SPARK-GRAPHX课件_第5页
已阅读5页,还剩91页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据导论第十一章大数据导论第十一章1CONTENTS目录PART01分布式图计算PART02SparkGraphx简介PART03Graphx实现PART04Graphx实例PART05SparkGraphx的优势PART06作业CONTENTS目录PART01分布式图计算PART2PART01分布式图计算PART01分布式图计算3分布式图计算数据并行与图并行计算1.数据并行系统,像MapReduce和Spark等计算框架,主要用于对数据集进行各种运算,在数据内部之间关联度不高的计算场景下能够进行很高效的处理。2.图并行计算对存在较高关联度的数据处理非常合适分布式图计算数据并行与图并行计算4分布式图计算图存储模式1.边分割:每个顶点都存储一次,但有的边会被打断分到两台机器上。2.点分割:每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上分布式图计算图存储模式5分布式图计算图计算模式1.图计算框架基本上都遵循BSP(BulkSynchronousParallell)计算模式。2.在BSP中,一次计算过程由一系列全局超步组成3.超步分为三个阶段:本地执行阶段全局通信阶段栅栏同步阶段分布式图计算图计算模式6分布式图计算Pregel模型:像顶点一样思考Pregel框架以顶点为中心,对边进行切割,将图数据分成若干个分区,每一个分区包含一组顶点以及由这组顶点为源顶点构成的边,并不断在顶点上进行算法迭代和数据同步分布式图计算Pregel模型:像顶点一样思考7分布式图计算Pregel计算过程读取输入数据,初始化图数据,在图数据上运行一系列的超步运算直至整个计算结束,输出结果。当一个节点结束计算之后,该节点停止运行,如果有新任务分配,则又重新开始运行,然后再次停止。当所有节点全部停止运行,并且没有新任务分配的时候,整个算法停止分布式图计算Pregel计算过程8分布式图计算GAS模型:邻居更新模型GAS模型是以节点为中心的图计算编程模型,某个顶点可能被部署到多台机器上,其中一台机器上的为主顶点(Master),其余机器上的为镜像顶点(Mirror),与主顶点的数据保持同步,将边唯一部署在某一台机器上分布式图计算GAS模型:邻居更新模型9分布式图计算GAS模型计算阶段划分收集阶段:工作顶点的边,可以是出边或入边,也可以同时包含入边和出边,从邻接顶点和自身收集数据,并对收集的数据使用用户定义的函数进行运算。这一阶段对顶点和边都是只读的。执行阶段:镜像顶点将收集阶段的计算结果发送给主顶点,主顶点对从各个镜像顶点收集的数据进行聚集运算,并利用聚集结果和上一步的顶点数据,按照用户定义的更新函数进行计算,更新主顶点的数据,并同步给镜像顶点。在执行阶段中,工作顶点可修改,边不可修改。分发阶段:工作顶点更新完成之后,更新边上的数据,通知对其有依赖的邻接边更新状态。在分发阶段,工作顶点只读,边上数据可写。分布式图计算GAS模型计算阶段划分10PART02SparkGraphX简介SparkGraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求PART02SparkGraphX简介SparkG11SparkGraphX简介GraphX的应用背景在社交网络中人与人之间存在有很多关系链,例如微博、微信、QQ、Twitter、Facebook、Linkedin等,这些都是大数据产生的地方,都需要图计算。因为图的结构复杂、数据量大,只有分布式的图处理才能胜任。由于SparkGraphX底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。SparkGraphX简介GraphX的应用背景12SparkGraphX简介GraphX的框架GraphX的核心抽象是ResilientDistributedPropertyGraph,一种点和边都带属性的有向多重图。它扩展了SparkRDD的抽象,具有Table和Graph两种视图,而只需要一份物理存储SparkGraphX简介GraphX的框架13SparkGraphX简介GraphX的框架SparkGraphX简介GraphX的框架14SparkGraphX简介GraphX的设计要点1.对GraphX视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成2.两种视图底层共用的物理数据,由RDD[VertexPartition]和RDD[EdgePartition]这两个RDD组成3.图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略SparkGraphX简介GraphX的设计要点15SparkGraphX简介GraphX的设计要点SparkGraphX简介GraphX的设计要点16PART03GraphX的实现GraphX公开一组基本的功能操作以及PregelAPI的一个优化。另外,GraphX包含了一个日益增长的图算法和图builders的集合,用以简化图分析任务。PART03GraphX的实现GraphX公开一组基本17GraphX的实现GraphX的存储模式Graphx使用点分割方式存储图,用三个RDD存储图数据信息:VertexTable(id,data)id为Vertexid,data为Edgedata;EdgeTable(pid,src,dst,data)pid为Partionid,src为源顶点id,dst为目的顶点id;RoutingTable(id,pid)id为Vertexid,pid为PartionidGraphX的实现GraphX的存储模式18GraphX的实现GraphX的计算模式GraphX的Graph类提供了丰富的图运算符GraphX的实现GraphX的计算模式GraphX的Gr19GraphX的实现1.图的缓存

varg=...varprevG:Graph[VD,ED]=nullwhile(...){prevG=gg=g.(………………)g.cache()prevG.unpersistVertices(blocking=false)

prevG.edges.unpersist(blocking=false)}GraphX的默认接口只提供unpersistVertices方法。如果要释放边,调用g.edges.unpersist()方法才行,根据GraphX中Graph的不变性,对g做操作并赋回给g之后,g已不是原来的g了,而且会在下一轮迭代使用,所以必须cacheGraphX的实现1.图的缓存varg=...Grap20GraphX的实现2.邻边聚合mrTriplets是GraphX中最核心的一个接口,它的计算过程为:Map:应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;Reduce:应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。MrTriplets:最后返回的是一个VertexRDD[A],包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。GraphX的实现2.邻边聚合21GraphX的实现2.邻边聚合defmapReduceTriplets[A](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],reduce:(A,A)=>A):VertexRDD[A]GraphX的实现2.邻边聚合defmapReduceTr22GraphX的实现3.进化的Pregel模式这种基于mrTrilets方法的Pregel模式,与标准Pregel的最大区别是,它的第2段参数体接收的是3个函数参数,而不接收messageList。它不会在单个顶点上进行消息遍历,而是将顶点的多个Ghost副本收到的消息聚合后,发送给Master副本,再使用vprog函数来更新点值。消息的接收和发送都被自动并行化处理,无需担心超级节点的问题GraphX的实现3.进化的Pregel模式23GraphX的实现3.进化的Pregel模式defpregel[A](initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)(vprog:(VertexID,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexID,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]GraphX的实现3.进化的Pregel模式defpreg24GraphX的实现//更新顶点

vprog(vid:Long,vert:Vertex,msg:Double):Vertex={v.score=msg+(1-ALPHA)*v.weight}//发送消息

sendMsg(edgeTriplet:EdgeTriplet[…]):Iterator[(Long,Double)](destId,ALPHA*edgeTriplet.srcAttr.score*edgeTriplet.attr.weight)}//合并消息

mergeMsg(v1:Double,v2:Double):Double={v1+v2}GraphX的实现//更新顶点25GraphX的实现4.图算法工具包GraphX提供了一套图算法工具包,方便用户对图进行分析。目前最新版本已支持PageRank、数三角形、最大连通图和最短路径等6种经典的图算法。这些算法的代码实现,目的和重点在于通用性。如果要获得最佳性能,可以参考其实现进行修改和扩展满足业务需求。另外,研读这些代码,也是理解GraphX编程最佳实践的好方法。GraphX的实现4.图算法工具包26PART04GraphX实例PART04GraphX实例27GraphX实例例子介绍有6个人,每个人有名字和年龄,这些人根据社会关系形成8条边,每条边有其属性。在以下例子演示中将构建顶点、边和图,打印图的属性、转换操作、结构操作、连接操作、聚合操作,并结合实际要求进行演示。GraphX实例例子介绍28GraphX实例——代码和操作解释

开始开始的第一步是引入Spark和GraphX到项目中:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.graphx._importorg.apache.spark.rdd.RDDGraphX实例——代码和操作解释开始importorg29GraphX实例——代码和操作解释

属性图属性图是一个有向多重图,属性图通过vertex(VD)和edge(ED)类型参数化。逻辑上的属性图对应于一对类型化的集合(RDD),这个集合编码了每一个顶点和边的属性。因此,图类包含访问图中顶点和边的成员。属性图类的定义如下:classGraph[VD,ED]{valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]}GraphX实例——代码和操作解释属性图classGra30

属性图在GraphX实例中,假设我们想构造一个包括用户社会关系的属性图。顶点属性可能包含用户名和年龄。我们可以用描述用户关系之间的密切关系的长度来标注边。所得的图形将具有类型签名:GraphX实例——代码和操作解释属性图GraphX实例——代码和操作解释31

属性图从RDD集合生成属性图//设置运行环境

valconf=newSparkConf().setAppName("SimpleGraphX").setMaster("local")valsc=newSparkContext(conf)//设置顶点和边,注意顶点和边都是用元组定义的Array//顶点的数据类型是VD:(String,Int)valvertexArray=Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50)))//边的数据类型ED:IntvaledgeArray=Array(Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3)

)//构造vertexRDD和edgeRDDvalvertexRDD:RDD[(Long,(String,Int))]=sc.parallelize(vertexArray)valedgeRDD:RDD[Edge[Int]]=sc.parallelize(edgeArray)//构造图Graph[VD,ED]valgraph:Graph[(String,Int),Int]=Graph(vertexRDD,edgeRDD)GraphX实例——代码和操作解释属性图//设置运行环境GraphX实例——代码和操作解释32

属性图分别用graph.vertices和graph.edges成员将一个图解构为相应的顶点和边。println("属性演示")println("找出图中年龄大于30的顶点:")graph.vertices.filter{case(id,(name,age))=>age>30}.collect.foreach{case(id,(name,age))=>println(s"$nameis$age")

}//边操作:找出图中属性大于5的边

println("找出图中属性大于5的边:")graph.edges.filter(e=>e.attr>5).collect.foreach(e=>println(s"${e.srcId}to${e.dstId}att${e.attr}"))printlnGraphX实例——代码和操作解释属性图println("属性演示")GraphX实例——代33

属性图除了属性图的顶点和边视图,GraphX也包含了一个三元组视图,三元视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD,ED]],它包含EdgeTriplet类的实例。//triplets操作,((srcId,srcAttr),(dstId,dstAttr),attr)println("列出边属性>5的tripltes:")for(triplet<-graph.triplets.filter(t=>t.attr>5).collect){println(s"${triplet.srcAttr._1}likes${triplet.dstAttr._1}")}printlnGraphX实例——代码和操作解释属性图//triplets操作,((srcId,sr34

属性操作属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。如RDD的map操作一样,属性图包含下面的操作:classGraph[VD,ED]{defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]}GraphX实例——代码和操作解释属性操作classGraph[VD,ED]{Grap35

结构性操作当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。classGraph[VD,ED]{defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean,vpred:(VertexId,VD)=>Boolean):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]}GraphX实例——代码和操作解释结构性操作classGraph[VD,ED]{Gra36

连接操作在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。下面列出的是主要的join操作。classGraph[VD,ED]{defjoinVertices[U](table:RDD[(VertexId,U)])(map:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](table:RDD[(VertexId,U)])(map:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]}GraphX实例——代码和操作解释连接操作classGraph[VD,ED]{Grap37

相邻聚合图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体)多次聚合相邻顶点的属性。

//***************************聚合操作

println("聚合操作")println("找出年纪最大的追求者:")valoldestFollower:VertexRDD[(String,Int)]=userGraph.mapReduceTriplets[(String,Int)](//将源顶点的属性发送给目标顶点,map过程

edge=>Iterator((edge.dstId,(edge.srcA,edge.srcAttr.age))),//得到最大追求者,reduce过程

(a,b)=>if(a._2>b._2)aelseb)userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower)=>optOldestFollowermatch{caseNone=>s"${}doesnothaveanyfollowers."caseSome((name,age))=>s"${name}istheoldestfollowerof${}."}}.collect.foreach{case(id,str)=>println(str)}printlnGraphX实例——代码和操作解释相邻聚合//**********************38

实用操作下面的代码找出顶点5到其他各顶点最短的边:

//***************************实用操作

println("聚合操作")println("找出5到各顶点的最短:")valsourceId:VertexId=5L//定义源点

valinitialGraph=graph.mapVertices((id,_)=>if(id==sourceId)0.0elseDouble.PositiveInfinity)valsssp=initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist)=>math.min(dist,newDist),triplet=>{//计算权重

if(triplet.srcAttr+triplet.attr<triplet.dstAttr){Iterator((triplet.dstId,triplet.srcAttr+triplet.attr))}else{Iterator.empty}},(a,b)=>math.min(a,b)//最短距离

)println(sssp.vertices.collect.mkString("\n"))GraphX实例——代码和操作解释实用操作//*********************39

实用操作下面的代码找出顶点5到其他各顶点最短的边:

//***************************实用操作

println("聚合操作")println("找出5到各顶点的最短:")valsourceId:VertexId=5L//定义源点

valinitialGraph=graph.mapVertices((id,_)=>if(id==sourceId)0.0elseDouble.PositiveInfinity)valsssp=initialGraph.pregel(Double.PositiveInfinity)((id,dist,newDist)=>math.min(dist,newDist),triplet=>{//计算权重

if(triplet.srcAttr+triplet.attr<triplet.dstAttr){Iterator((triplet.dstId,triplet.srcAttr+triplet.attr))}else{Iterator.empty}},(a,b)=>math.min(a,b)//最短距离

)println(sssp.vertices.collect.mkString("\n"))GraphX实例——代码和操作解释实用操作//*********************40PART05SparkGraphX的优势

PART05SparkGraphX的优势41SparkGraphX的优势1.SparkGraphX能够把表格和图进行互相转换2.能够用更少的框架解决更多的问题解释:SparkGraphX基于Spark,可以和MLlib、SparkSQL等进行协作3.处理效率更高解释:由于基于spark,中间结果不必每次都写入磁盘,结构化数据不必重构,统一了表格和图视图,可以轻松做流水线操作SparkGraphX的优势1.SparkGraphX能42PART06作业

PART06作业43作业作业:1.什么是数据并行计算框架,什么是图并行计算框架?各自的主要应用场合是什么?2.图存储的模式有哪两种?各自的优缺点是什么?为什么点分割模式现在更流行?3.BSP(BulkSynchronousParallell)计算模式的三个阶段是什么?4.请描述Pregel计算模型?它的主要优缺点是什么?5.图11.17是一个有向图,每一个节点有一个数字,顶点的算法是当节点收到了一个比自己的数字更小的数字,该节点将数字修改成这个较小值,请使用Pregel计算模型画出每一个超步时候各节点的状态值。作业作业:1.什么是数据并行计算框架,什么是图并行计算框架?44作业作业:6.什么是GAS模型?其三个处理阶段是什么?7.什么是SparkGraphX?其主要应用背景是什么?8.什么是GraphX的核心抽象?它有哪两种视图?其主要优势是什么?9.GraphX的主要设计要点有哪三条?10.请画出图11.18图的GraphX的属性图,VertexTable和EdgeTable作业作业:6.什么是GAS模型?其三个处理阶段是什么?45作业作业:作业作业:46作业作业:11.GraphX的属性操作主要有哪些?12.GraphX的结构性操作主要有哪些?13.GraphX的连接操作主要有哪些?14.GraphX的相邻聚合操作的功能是什么?15.GraphX的主要优势有哪三点?16.请编写完成求单词计数的任务。SparkStreaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过SparkStreaming计算出改时间段内单词统计数。作业作业:11.GraphX的属性操作主要有哪些?47谢谢FORYOURLISTENING谢谢FORYOURLISTENING48

大数据导论第十一章大数据导论第十一章49CONTENTS目录PART01分布式图计算PART02SparkGraphx简介PART03Graphx实现PART04Graphx实例PART05SparkGraphx的优势PART06作业CONTENTS目录PART01分布式图计算PART50PART01分布式图计算PART01分布式图计算51分布式图计算数据并行与图并行计算1.数据并行系统,像MapReduce和Spark等计算框架,主要用于对数据集进行各种运算,在数据内部之间关联度不高的计算场景下能够进行很高效的处理。2.图并行计算对存在较高关联度的数据处理非常合适分布式图计算数据并行与图并行计算52分布式图计算图存储模式1.边分割:每个顶点都存储一次,但有的边会被打断分到两台机器上。2.点分割:每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上分布式图计算图存储模式53分布式图计算图计算模式1.图计算框架基本上都遵循BSP(BulkSynchronousParallell)计算模式。2.在BSP中,一次计算过程由一系列全局超步组成3.超步分为三个阶段:本地执行阶段全局通信阶段栅栏同步阶段分布式图计算图计算模式54分布式图计算Pregel模型:像顶点一样思考Pregel框架以顶点为中心,对边进行切割,将图数据分成若干个分区,每一个分区包含一组顶点以及由这组顶点为源顶点构成的边,并不断在顶点上进行算法迭代和数据同步分布式图计算Pregel模型:像顶点一样思考55分布式图计算Pregel计算过程读取输入数据,初始化图数据,在图数据上运行一系列的超步运算直至整个计算结束,输出结果。当一个节点结束计算之后,该节点停止运行,如果有新任务分配,则又重新开始运行,然后再次停止。当所有节点全部停止运行,并且没有新任务分配的时候,整个算法停止分布式图计算Pregel计算过程56分布式图计算GAS模型:邻居更新模型GAS模型是以节点为中心的图计算编程模型,某个顶点可能被部署到多台机器上,其中一台机器上的为主顶点(Master),其余机器上的为镜像顶点(Mirror),与主顶点的数据保持同步,将边唯一部署在某一台机器上分布式图计算GAS模型:邻居更新模型57分布式图计算GAS模型计算阶段划分收集阶段:工作顶点的边,可以是出边或入边,也可以同时包含入边和出边,从邻接顶点和自身收集数据,并对收集的数据使用用户定义的函数进行运算。这一阶段对顶点和边都是只读的。执行阶段:镜像顶点将收集阶段的计算结果发送给主顶点,主顶点对从各个镜像顶点收集的数据进行聚集运算,并利用聚集结果和上一步的顶点数据,按照用户定义的更新函数进行计算,更新主顶点的数据,并同步给镜像顶点。在执行阶段中,工作顶点可修改,边不可修改。分发阶段:工作顶点更新完成之后,更新边上的数据,通知对其有依赖的邻接边更新状态。在分发阶段,工作顶点只读,边上数据可写。分布式图计算GAS模型计算阶段划分58PART02SparkGraphX简介SparkGraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求PART02SparkGraphX简介SparkG59SparkGraphX简介GraphX的应用背景在社交网络中人与人之间存在有很多关系链,例如微博、微信、QQ、Twitter、Facebook、Linkedin等,这些都是大数据产生的地方,都需要图计算。因为图的结构复杂、数据量大,只有分布式的图处理才能胜任。由于SparkGraphX底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。SparkGraphX简介GraphX的应用背景60SparkGraphX简介GraphX的框架GraphX的核心抽象是ResilientDistributedPropertyGraph,一种点和边都带属性的有向多重图。它扩展了SparkRDD的抽象,具有Table和Graph两种视图,而只需要一份物理存储SparkGraphX简介GraphX的框架61SparkGraphX简介GraphX的框架SparkGraphX简介GraphX的框架62SparkGraphX简介GraphX的设计要点1.对GraphX视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成2.两种视图底层共用的物理数据,由RDD[VertexPartition]和RDD[EdgePartition]这两个RDD组成3.图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略SparkGraphX简介GraphX的设计要点63SparkGraphX简介GraphX的设计要点SparkGraphX简介GraphX的设计要点64PART03GraphX的实现GraphX公开一组基本的功能操作以及PregelAPI的一个优化。另外,GraphX包含了一个日益增长的图算法和图builders的集合,用以简化图分析任务。PART03GraphX的实现GraphX公开一组基本65GraphX的实现GraphX的存储模式Graphx使用点分割方式存储图,用三个RDD存储图数据信息:VertexTable(id,data)id为Vertexid,data为Edgedata;EdgeTable(pid,src,dst,data)pid为Partionid,src为源顶点id,dst为目的顶点id;RoutingTable(id,pid)id为Vertexid,pid为PartionidGraphX的实现GraphX的存储模式66GraphX的实现GraphX的计算模式GraphX的Graph类提供了丰富的图运算符GraphX的实现GraphX的计算模式GraphX的Gr67GraphX的实现1.图的缓存

varg=...varprevG:Graph[VD,ED]=nullwhile(...){prevG=gg=g.(………………)g.cache()prevG.unpersistVertices(blocking=false)

prevG.edges.unpersist(blocking=false)}GraphX的默认接口只提供unpersistVertices方法。如果要释放边,调用g.edges.unpersist()方法才行,根据GraphX中Graph的不变性,对g做操作并赋回给g之后,g已不是原来的g了,而且会在下一轮迭代使用,所以必须cacheGraphX的实现1.图的缓存varg=...Grap68GraphX的实现2.邻边聚合mrTriplets是GraphX中最核心的一个接口,它的计算过程为:Map:应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;Reduce:应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。MrTriplets:最后返回的是一个VertexRDD[A],包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。GraphX的实现2.邻边聚合69GraphX的实现2.邻边聚合defmapReduceTriplets[A](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],reduce:(A,A)=>A):VertexRDD[A]GraphX的实现2.邻边聚合defmapReduceTr70GraphX的实现3.进化的Pregel模式这种基于mrTrilets方法的Pregel模式,与标准Pregel的最大区别是,它的第2段参数体接收的是3个函数参数,而不接收messageList。它不会在单个顶点上进行消息遍历,而是将顶点的多个Ghost副本收到的消息聚合后,发送给Master副本,再使用vprog函数来更新点值。消息的接收和发送都被自动并行化处理,无需担心超级节点的问题GraphX的实现3.进化的Pregel模式71GraphX的实现3.进化的Pregel模式defpregel[A](initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)(vprog:(VertexID,VD,A)=>VD,sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexID,A)],mergeMsg:(A,A)=>A):Graph[VD,ED]GraphX的实现3.进化的Pregel模式defpreg72GraphX的实现//更新顶点

vprog(vid:Long,vert:Vertex,msg:Double):Vertex={v.score=msg+(1-ALPHA)*v.weight}//发送消息

sendMsg(edgeTriplet:EdgeTriplet[…]):Iterator[(Long,Double)](destId,ALPHA*edgeTriplet.srcAttr.score*edgeTriplet.attr.weight)}//合并消息

mergeMsg(v1:Double,v2:Double):Double={v1+v2}GraphX的实现//更新顶点73GraphX的实现4.图算法工具包GraphX提供了一套图算法工具包,方便用户对图进行分析。目前最新版本已支持PageRank、数三角形、最大连通图和最短路径等6种经典的图算法。这些算法的代码实现,目的和重点在于通用性。如果要获得最佳性能,可以参考其实现进行修改和扩展满足业务需求。另外,研读这些代码,也是理解GraphX编程最佳实践的好方法。GraphX的实现4.图算法工具包74PART04GraphX实例PART04GraphX实例75GraphX实例例子介绍有6个人,每个人有名字和年龄,这些人根据社会关系形成8条边,每条边有其属性。在以下例子演示中将构建顶点、边和图,打印图的属性、转换操作、结构操作、连接操作、聚合操作,并结合实际要求进行演示。GraphX实例例子介绍76GraphX实例——代码和操作解释

开始开始的第一步是引入Spark和GraphX到项目中:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.graphx._importorg.apache.spark.rdd.RDDGraphX实例——代码和操作解释开始importorg77GraphX实例——代码和操作解释

属性图属性图是一个有向多重图,属性图通过vertex(VD)和edge(ED)类型参数化。逻辑上的属性图对应于一对类型化的集合(RDD),这个集合编码了每一个顶点和边的属性。因此,图类包含访问图中顶点和边的成员。属性图类的定义如下:classGraph[VD,ED]{valvertices:VertexRDD[VD]valedges:EdgeRDD[ED]}GraphX实例——代码和操作解释属性图classGra78

属性图在GraphX实例中,假设我们想构造一个包括用户社会关系的属性图。顶点属性可能包含用户名和年龄。我们可以用描述用户关系之间的密切关系的长度来标注边。所得的图形将具有类型签名:GraphX实例——代码和操作解释属性图GraphX实例——代码和操作解释79

属性图从RDD集合生成属性图//设置运行环境

valconf=newSparkConf().setAppName("SimpleGraphX").setMaster("local")valsc=newSparkContext(conf)//设置顶点和边,注意顶点和边都是用元组定义的Array//顶点的数据类型是VD:(String,Int)valvertexArray=Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50)))//边的数据类型ED:IntvaledgeArray=Array(Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3)

)//构造vertexRDD和edgeRDDvalvertexRDD:RDD[(Long,(String,Int))]=sc.parallelize(vertexArray)valedgeRDD:RDD[Edge[Int]]=sc.parallelize(edgeArray)//构造图Graph[VD,ED]valgraph:Graph[(String,Int),Int]=Graph(vertexRDD,edgeRDD)GraphX实例——代码和操作解释属性图//设置运行环境GraphX实例——代码和操作解释80

属性图分别用graph.vertices和graph.edges成员将一个图解构为相应的顶点和边。println("属性演示")println("找出图中年龄大于30的顶点:")graph.vertices.filter{case(id,(name,age))=>age>30}.collect.foreach{case(id,(name,age))=>println(s"$nameis$age")

}//边操作:找出图中属性大于5的边

println("找出图中属性大于5的边:")graph.edges.filter(e=>e.attr>5).collect.foreach(e=>println(s"${e.srcId}to${e.dstId}att${e.attr}"))printlnGraphX实例——代码和操作解释属性图println("属性演示")GraphX实例——代81

属性图除了属性图的顶点和边视图,GraphX也包含了一个三元组视图,三元视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD,ED]],它包含EdgeTriplet类的实例。//triplets操作,((srcId,srcAttr),(dstId,dstAttr),attr)println("列出边属性>5的tripltes:")for(triplet<-graph.triplets.filter(t=>t.attr>5).collect){println(s"${triplet.srcAttr._1}likes${triplet.dstAttr._1}")}printlnGraphX实例——代码和操作解释属性图//triplets操作,((srcId,sr82

属性操作属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。如RDD的map操作一样,属性图包含下面的操作:classGraph[VD,ED]{defmapVertices[VD2](map:(VertexId,VD)=>VD2):Graph[VD2,ED]defmapEdges[ED2](map:Edge[ED]=>ED2):Graph[VD,ED2]defmapTriplets[ED2](map:EdgeTriplet[VD,ED]=>ED2):Graph[VD,ED2]}GraphX实例——代码和操作解释属性操作classGraph[VD,ED]{Grap83

结构性操作当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。classGraph[VD,ED]{defreverse:Graph[VD,ED]defsubgraph(epred:EdgeTriplet[VD,ED]=>Boolean,vpred:(VertexId,VD)=>Boolean):Graph[VD,ED]defmask[VD2,ED2](other:Graph[VD2,ED2]):Graph[VD,ED]defgroupEdges(merge:(ED,ED)=>ED):Graph[VD,ED]}GraphX实例——代码和操作解释结构性操作classGraph[VD,ED]{Gra84

连接操作在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。下面列出的是主要的join操作。classGraph[VD,ED]{defjoinVertices[U](table:RDD[(VertexId,U)])(map:(VertexId,VD,U)=>VD):Graph[VD,ED]defouterJoinVertices[U,VD2](table:RDD[(VertexId,U)])(map:(VertexId,VD,Option[U])=>VD2):Graph[VD2,ED]}GraphX实例——代码和操作解释连接操作classGraph[VD,ED]{Grap85

相邻聚合图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体)多次聚合相邻顶点的属性。

//***************************聚合操作

println("聚合操作")println("找出年纪最大的追求者:")valoldestFollower:VertexRDD[(String,Int)]=userGraph.mapReduceTriplets[(String,Int)](//将源顶点的属性发送给目标顶点,map过程

edge=>Iterator((edge.dstId,(edge.srcA,edge.srcAttr.age))),//得到最大追求者,reduce过程

(a,b)=>if(a._2>b._2)aelseb)userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower)=>optOldestFollowermatch{caseNone=>s"${}doesnothaveanyfollowers."caseSome((name,age))=>s"${name}istheoldestfollowerof${}."}}.collect.foreach{case(id,str)=>println(str)}printlnGraphX实例——代码和操作解释相邻聚合//**********************86

实用操作下面的代码找出顶点5到其他各顶点最短的边:

//***************************实用操作

println("聚合操作")println("找出5到各顶点的最短:")valsourceId:VertexId=5L//定义源点

valinitialGraph=graph.mapVertices((id,

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论