版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
《大数据技术实战案例教程》实验指导书实验9FlinkStandalone集群部署和基本编程编写者:徐鲁辉实验9FlinkStandalone集群部署和基本编程9.1实验目的1.理解Flink运行架构。2.理解Flink编程模型,掌握常用的DataStreamAPI和DataSetAPI。3.理解Flink部署要点包括运行环境、运行模式,以及配置文件flink-conf.yaml、masters和workers等。4.熟练掌握在Linux环境下部署FlinkStandalone集群,掌握FlinkWebUI、FlinkShell常用命令如“flinkrun”等的使用方法,掌握使用Python、Java或Scala语言进行DataSetAPI编程、DataStreamAPI编程,实现海量数据的批处理和流计算。9.2实验环境本实验所需的软件环境包括全分布模式Hadoop集群、Scala、Flink安装包、Maven安装包。9.3实验内容1.规划FlinkStandalone集群。2.部署FlinkStandalone集群。3.启动FlinkStandalone集群。4.验证FlinkStandalone集群。5.使用DataSetAPI采用Scala语言编写Flink批处理程序,实现对内容为英文字符的HDFS文件的数据读取,统计单词词频,并将处理结果输出到HDFS文件中。。6.使用DataStreamAPI采用Scala语言编写Flink流处理程序,采用Socket数据源,由Socket服务器端不断向客户端Flink流处理程序发送数据流(内容为英文字符),使统计单词词频,要求使用滚动窗口实现且窗口大小为5s(即每隔5秒对数据流进行一次切分),并将处理结果输出到终端上。7.关闭FlinkStandalone集群。9.4实验原理9.4.1初识FlinkApacheFlink是一个开源的、分布式的、高性能的和高可用的大数据计算引擎,具有十分强大的功能,它主要采用Java语言编写,实现了GoogleDataflow流计算模型,同时支持流计算和批处理。Flink起源于柏林工业大学、柏林洪堡大学和哈索·普拉特纳研究所联合开展的一个研究性项目StratoSphere,早期专注于批计算,后来转向了流计算。2014年4月StratoSphere代码被捐献给Apache,成为孵化项目,在孵化期间,为了避免与另一个项目重名,StratoSphere被重新命名为Flink,同年12月,Flink成为Apache顶级项目,开始在开源大数据行业内崭露头角。当前,大数据技术正处于快速发展之中,不断有新技术涌现。在Spark流行之前,Hadoop俨然已成为大数据技术的事实标准,在企业中得到了广泛应用,但其本身还存在诸多缺陷,最主要的缺陷是MapReduce计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用离线批处理的应用场景。Spark在设计上充分吸收和借鉴了MapReduce的精随并加以改进,同时,采用了先进的DAG执行引擎,以支持循环数据流与内存计算,因此,在性能上比MapReduce有了大幅度的提升,迅速获得了学界和业界的广泛关注。作为大数据计算平台的后起之秀,Spark在2014年打破了Hadoop保持的基准排序纪录,此后逐渐发展成为大数据领域最热门的大数据计算平台之一。但是,Spark的短板在于无法满足毫秒级别的企业实时数据分析需求,Spark流计算组件SparkStreaming的核心思路是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用SparkCore进行快速处理。然而SparkStreaming在实现高吞吐和容错性的同时,却牺牲了低延迟和实时处理能力,最快只能满足秒级的实时计算需求,无法满足毫秒级的实时计算需求。由于SparkStreaming组件无法满足一些需要更快响应时间的企业应用需求,因此Spark社区又推出了StructuredStreaming,StructuredStreaming是一种基于SparkSQL引擎构建的、可扩展且容错的流处理引擎。StructuredStreaming包括微批处理和持续处理两种处理模型,采用微批处理时,最快响应时间需要100ms,无法支持毫秒级别响应,采用持续处理模型时,可以支持毫秒级别响应,但是,只能做到“至少一次”一致性,无法做到“精确一次”一致性。因此,市场需要一款能够实现毫秒级别响应并且支持“精确一次”一致性的、高吞吐的、高性能的流计算框架,而Flink是当前唯一能够满足上述要求的产品,它正在成为大数据计算领域中流处理的标准。9.4.2Flink技术栈Flink发展愈加成熟,目前已拥有丰富的核心组件栈。如图9-1所示,Flink技术栈分为4层:物理部署层、Runtime核心层、API、库。本地模式(本地模式(Local)单个JVM物理部署层集群模式(Cluster)Standalone、YARN等云模式(Cloud)GCE、EC2等Runtime分布式数据流DataStreamAPI流处理DataSetAPI批处理CEP事件处理TableAPI&SQL关系型FlinkML机器学习Gelly图计算TableAPI&SQL关系型Runtime核心层API库图9-1Flink核心组件栈(1)物理部署层Flink的底层是物理部署层,Flink可以采用本地模式(Local)运行,也可以采用集群模式(Cluster)运行,如Standalone集群模式、YARN集群模式等,或者也可以采用云模式(Cloud)运行,如GCE(谷歌云服务)和EC2(亚马逊云服务)。(2)Runtime核心层物理部署层的上层,Flink核心部分,该层主要负责对上层不同接口提供基础服务。(3)API该层提供了两套核心API:DataStreamAPI(流处理)和DataSetAPI(批处理)。(4)库在核心API基础上抽象出不同的应用类型的组件库,如CEP(基于流处理的事件处理库)、TableAPI&SQL(既可以基于流处理,也可以基于批处理的关系数据库)、FlinkML(基于批处理的机器学习库)、Gelly(基于批处理的图计算库)等。这里需要说明的是,Flink虽然也构建了一个大数据生态系统,功能涵盖流处理、批处理、SQL、图计算和机器学习等,但是,它的强项仍然是流计算,其图计算组件Gelly和机器学习组件FlinkML并不十分成熟。9.4.3Flink运行架构Flink遵从主从架构(Master/Slave)设计原则,其Master为JobManager,Slave为TaskManager。Flink运行架构如图9-2所示。JobManagerJobManagerTaskManagerTaskSlotFlinkClientTaskSlotTaskSlotTaskManager
TaskSlotTaskSlotTaskSlotDataFlow图9-2Flink运行架构Flink系统的工作原理为:在执行Flink程序时,FlinkClient将作业提交给JobManager,JobManager负责协调资源分配和作业执行,它首先要做的是分配所需资源,资源分配完成后,任务将提交给相应的TaskManager,在接收任务时,TaskManager启动一个线程以开始执行。执行到位时,TaskManager会继续向JobManager报告状态更改,可以有各种状态,例如开始执行、正在进行或已完成。作业执行完成后,结果将发送回客户端(JobClient)。9.4.4Flink编程模型Flink提供四种级别的抽象来开发流/批处理应用程序,如图9-3所示。StatefulStreamProcessingStatefulStreamProcessing低级APIDataStreamAPI/DataSetAPI核心APITableAPI声明式DSLSQL高层次语言图9-3FlinkAPI抽象级别(1)低级API。最低级的抽象接口是有状态数据流处理接口。这个接口通过过程函数(ProcessFunction)嵌入DataStreamAPI。该接口允许用户自由地处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户可以通过注册事件时间并处理回调函数的方法来实现复杂的计算。(2)核心API。实际上,大多数应用并不需要上述低级API,而是针对核心API如DataStreamAPI(有界/无界数据流)、DataSetAPI(有界数据集)的编程。核心API为数据处理提供了大量的通用模块,包括用户定义的各种各样的变换(Transformations)、连接(Joins)、聚合(Aggregations)、窗口(Windows)、状态(State)等。DataStreamAPI集成了底层处理函数,使得对一些特定的操作提供更低层次的抽象。DataSetAPI为有界数据集提供了一些补充的编程原语,例如循环、迭代等。(3)声明式DSL。TableAPI是一种以数据表为核心的声明式DSL,能够动态地修改表(当处理数据流时)。TableAPI是一种扩展的关系模型:Table有一个附加模式(类似于关系型数据库中的表结构),且API提供了类似操作如select、project、join、groupby、aggregate等。TableAPI程序定义的是应该执行什么样的逻辑操作,而不是直接准确地指定程序代码运行的具体步骤。尽管TableAPI可以通过自定义函数(UDF)进行扩展,但它在表达能力上仍然不如核心API,不过它用起来更加简洁(代码量更少)。此外,TableAPI程序在执行之前会经过内置优化器进行优化。用户可以在表和DataStream/DataSet之间无缝切换,以允许程序将TableAPI和DataStreamAPI/DataSetAPI混合使用。(4)高层次语言。Flink提供的最高级接口是SQL。这个层次的抽象接口在语言和表达能力上与TableAPI非常相似,唯一区别是通过SQL语言实现程序。SQL抽象接口和TableAPI交互密切,同时SQL查询可以直接在TableAPI定义的表上执行。接下来重点讲述核心API:DataStreamAPI和DataSetAPI。1.DataStreamAPIFlink定义了DataStreamAPI,可以让用户灵活且高效地编写Flink流式应用和批处理应用。DataStreamAPI主要分为三个部分:数据源模块(DataSource)、数据转换模块(Transformation)和数据输出模块(DataSink)。其中DataSource模块定义了数据接入功能,主要是将各种外部数据接入Flink系统中,并将接入数据转换成对应的DataStream数据集。Transformation模块定义了对DataStream数据集的各种转换操作,比如map、filter、windows等操作。DataSink模块负责把结果数据输出到外部存储介质中,比如文件或Kafka等。1)数据源模块(DataSource)DataSource模块定义了DataStreamAPI中的数据输入操作,FlinkDataStream的数据源分为两类:内置数据源和第三方数据源。(1)内置数据源①文件数据源基于文件创建DataStream有两种方式:readTextFile和readFile。readTextFile(path):逐行读取指定文件来创建DataStream,使用系统默认字符编码读取。readFile(fileInputFormat,path):根据指定的文件输入格式读取一次文件来创建DataStream。②Socket数据源支持从Socket端口中接入数据,在StreamExecutionEnvironment中调用socketTextStream()方法。socketTextStream():从Socket端口中接入数据,一般需要提供两个参数,即IP地址和端口号。③集合数据源Flink可以直接将Java或Scala程序中的集合类(Collection)转换为DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。目前Flink支持将Java.util.Collection和Java.util.Iterator转换为DataStream数据集。需要注意的是,集合中数据结构的类型必须一致,否则可能会出现数据转换异常。例如:fromCollection(Seq):从Java.util.Collection创建DataStream,所有元素必须属于同一类型。fromCollection(Iterator):从迭代器创建DataStream。该类指定迭代器返回的元素的数据类型。fromElements(elements:_*):根据给定的对象序列创建DataStream。所有对象必须属于同一类型。fromParallelCollection(SplittableIterator):并行地从迭代器创建DataStream。该类指定迭代器返回的元素的数据类型。generateSequence(from,to):并行生成给定时间间隔内的数字序列。(2)第三方数据源在实际应用中,数据源的种类非常多,也比较复杂,内置的数据源很难满足需求,Flink提供了丰富的第三方数据连接器,可以访问外部数据源。①数据源连接器Flink通过实现SourceFunction定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等,其中一部分连接器仅支持读取数据,如TwiterStreamingAPI、Netty等:另外一部分连接器仅支持数据输出(DataSink),不支持数据输入(DataSource),如ApacheCassandra、Elasticsearch、HadoopFileSystem等。还有一部分连接器既支持数据输入,又支持数据输出,如ApacheKafka、AmazonKinesis、RabbitMQ等连接器。②自定义数据源连接器Flink中已经实现了大多数主流的数据源连接器,但Flink整体架构非常开放,用户可以自定义连接器,以满足不同的数据源的接入需求。可以通过实现SourceFunction定义单个线程接入的数据接入器,也可以通过实现ParallelSourceFunction接口或继承RichParaleISourceFuntion类定义并发数据源接入器。DataSources定义完成后,可以通过使用StreamExecutionEnvironment的addSource()方法添加数据源,这样就可以将外部系统中的数掂转换成DataStream[T]数据集合,其中T类型是SourceFunction的返回值类型,然后就可以完成各种流式数据的转换操作了。2)DataStream数据转换模块(Transformation)数据处理的核心是对数据进行各种Transformation(转换操作),Flink流处理数据转换其实是将一个或多个DataStream转换为新的DataStream。数据转换模块可以将多个Transformation组合成一个复杂的拓扑结构。FlinkDataStream常用的Transformation算子表9-1所示。表9-1DataStream常用Transformation算子Transformation算子输入输出类型说明map(func)DataStream→DataStream将DataStream中的每个元素传递到函数func中,并将结果返回为一个新的DataStreamflatMap(func)DataStream→DataStream与map()类似,但每个输入元素可以映射到0到多个输出结果filter(func)DataStream→DataStream筛选出满足函数func的元素,并返回一个新的DataStreamkeyByDataStream→KeyedStream根据指定的Key将输入DataStream转换为KeyedStreamreduce(func)KeyedStream→DataStream将输入的KeyedStream通过传入的用户自定义函数func滚动地进行数据聚合处理聚合KeyedStream→DataStream根据指定的字段进行聚合操作3)数据输出模块(DataSink)Flink中将DataStream数据输出到外部存储系统的过程定义为DataSink操作。FlinkDataStream数据输出类型有两种:基本数据输出、第三方数据输出。基本数据输出包括writeAsText()/TextOutputFormat、writeAsCsv()/CsvOutputFormatSocket、print()/printToErr()、writeToSocket()等;第三方数据输出包括ApacheKafka、ApacheCassandra、ElasticSearch、HadoopFileSystem、RabbitMQ、NIFI等,通过addSink()方法完成。受篇幅所限,最完整权威的DataStreamAPI编程指南请读者参阅官网/flink/flink-docs-release-1.11/dev/datastream_api.html,请注意个人采用的Flink版本。2.DataSetAPI跟DataStream类似,Flink也定义了DataSetAPI,可以让用户灵活且高效地编写Flink批处理应用。DataSetAPI也可分为三个部分:DataSource模块、Transformation模块以及DataSink模块。其中DataSource模块定义了数据接入功能,主要是将各种外部数据接入Flink系统中,并将接入数据转换成对应的DataSet数据集。在Transformation模块定义了对DataSet数据集的各种转换操作,比如map、filter等操作。最后,将结果数据通过DataSink模块写到外部存储介质中,比如将数据输出到文件或HBase数据库等。受篇幅所限,最完整权威的DataSetAPI编程指南请读者参阅官网/flink/flink-docs-release-1.11/dev/batch/,请注意个人采用的Flink版本。值得注意的是,DataSetAPI会逐渐被弃用,官网建议用TableAPI替代DataSetAPI。9.4.5Flink应用程序编写步骤一个完整的Flink应用程序包括三部分:数据源(Source)、数据转换(Transformation)和数据输出(Sink),如图9-4所示。SourceSourceTransformationSink图9-4Flink应用程序结构1.Flink批处理应用程序编写步骤DataSet用于Flink批处理,用户可以使用DataSetAPI处理批量数据。编写Flink批处理应用程序一般包括4个步骤:(1)建立执行环境。(2)创建数据源。(3)对数据集指定转换操作。(4)输出结果。例如,下述Scala代码段就是一个用于单词词频统计的Flink批处理应用程序,数据源为HDFS文件。//第1步,建立执行环境valbenv=ExecutionEnvironment.getExecutionEnvironment//第2步,创建数据源vallines=bevn.readTextFile("hdfs://master:9000/InputData/file*.txt")//第3步,对数据集指定转换操作valwordCount=lines.flatMap(_.spllit("")).map((_,1)).groupBy(0).sum(1)//第4步,输出结果wordCount.print()上述代码段中,Flink使用readTextFile()方法直接读取文件数据源,它会逐行读取数据并将其转换成DataSet类型数据集。lines对象为org.apache.flink.api.scala.DataSet类型。该程序依赖FlinkScalaAPI,因此,需要首先在项目目录下新建文件pom.xml,用来声明Flink独立应用程序的信息以及与Flink的依赖关系,其次需要通过Maven进行编译打包。2.Flink流处理应用程序编写步骤DataStream用于Flink流计算,用户可以使用DataStreamAPI处理无界流数据。编写Flink流处理应用程序一般包括4个步骤:(1)建立执行环境。(2)创建数据源。(3)对数据集指定转换操作。(4)指定计算结果输出位置。(5)指定名称并触发流计算。例如,下述Scala代码段就是一个用于单词词频统计的Flink流处理应用程序,数据源为Socket流。//第1步,建立执行环境valsenv=StreamExecutionEnvironment.getExecutionEnvironment//第2步,创建数据源valsource=sevn.socketTextStream("localhost",9999)//第3步,对数据集指定转换操作valdataStream=source.flatMap(_.spllit("")).map((_,1)).KeyBy(0).timeWindow(Time.seconds(2),Time.seconds(2)).sum(1)//第4步,指定计算结果输出位置dataStream.print()//第5步,指定名称并触发流计算env.execute("FlinkStreamingWordCount")上述代码段中,Flink通过调用socketTextStream()方法从Socket端口中接入数据,在调用该方法时,一般需要提供两个参数,即IP地址和端口,本例中分别为主机名“localhost”、端口号9999。source对象为org.apache.flink.streaming.api.scala.DataStream类型。需要注意的是,一定不能忘记第5步显式调用execute()方法,否则前面第3步编写的转换操作并不会被真正执行。另外,与上文批处理应用程序相同,需要首先在项目目录下新建文件pom.xml,用来声明Flink独立应用程序的信息以及与Flink的依赖关系,其次需要通过Maven进行编译打包。由于篇幅所限,关于FlinkDataStreamAPI编程指南,读者请参考官网/flink/flink-docs-release-1.11/dev/datastream_api.html。9.4.6部署Flink要点1.Flink运行环境部署与运行Flink所需要的系统环境,主要包括操作系统、Java环境和SSH三部分。1)操作系统Flink支持Windows和类UNIX(例如Linux、MacOS)操作系统。编者采用的操作系统为Linux发行版CentOS7。2)Java环境Flink主要采用Java语言编写,因此Flink运行需要Java环境的支持。每个Flink版本对Java版本要求不同,例如Flink1.11.6要求Java8及以上。编者采用的Java为OracleJDK1.8。3)SSHFlink集群若想运行,其运行平台Linux必须安装SSH,且sshd服务必须运行,只有这样,才能使Flink集群中的主节点与集群中所有节点建立通信。本书选用的CentOS7自带有SSH。另外,若Flink采用HDFS存储数据,或采用YARN管理集群资源,则还需要安装Hadoop。2.Flink运行模式目前,Flink支持三种运行模式:本地模式、集群模式、云模式。其中,集群模式常用于企业的实际生产环境,根据集群资源管理器的类型主要有FlinkStandalone模式、FlinkonYARN模式、FlinkonMesos模式、FlinkonKubernetes模式等。Flink运行模式多样化,此处介绍常用的三种方式。(1)本地模式本地模式是最简单的Flink运行方式,开箱即用,只需提前安装好Java即可使用,会启动单个JVM,主要用于测试、调试代码。(2)FlinkStandalone模式Flink可以通过部署与YARN架构类似的框架来实现自己的集群模式,该集群模式的架构设计与HDFS和YARN大同小异,都是由一个主节点和多个从节点组成。在FlinkStandalone模式中,JobManager节点为主节点,TaskManager节点为从节点。(3)FlinkonYARN模式简而言之,FlinkonYARN模式是将Flink应用程序运行在YARN集群之上。不过FlinkonYARN的Job运行模式大致分为两类。在YARN中,初始化一个Flink集群,开辟指定的资源,之后提交的FlinkJob都在此Flinkyarn-session中,无论提交多少个Job都会共用初始化时在YARN中申请的资源。在这种模式下,除非手动停止Flink集群,否则Flink集群会常驻在YARN集群中。在YARN中,每次提交Job都会创建一个新的Flink集群,任务之间相互独立并且方便管理,任务执行完成之后创建的集群也会消失。3.Flink配置文件Flink配置文件数量不多,都存放在目录${FLINK_HOME}/conf下,具体的配置文件如图9-5所示。图9-5Flink配置文件列表Flink配置文件中最重要的是flink-conf.yaml、masters和workers。其中,配置文件masters用于指定Flink集群的主节点JobManager,workers用于指定Flink集群的从节点TaskManager,配置比较简单;配置文件flink-conf.yaml是Flink的核心配置文件,用于指定Flink运行时的各种参数,参数众多,主要包括基本配置、通用配置、安全配置等,flink-conf.yaml部分配置参数如表9-2所示,完整参数及其说明读者可以查阅/flink/flink-docs-release-1.11/ops/config.html。表9-2flink-conf.yaml配置参数(部分)参数名说明默认值或示例env.java.home配置Java安装路径/usr/java/jdk1.8.0_191jobmanager.rpc.addressJobManager的IP地址或主机名localhostjobmanager.memory.flink.sizeJobManager的Flink总内存(TotalFlinkMemory)1024mtaskmanager.memory.flink.sizeTaskManager的Flink总内存(TotalFlinkMemory)1024mtaskmanager.numberOfTaskSlots每个TaskManager提供的TaskSlot数量1parallelism.default应用程序并行度1io.tmp.dirsFlink临时数据保存目录/tmphigh-availability高可用模式,值必须为“zookeeper”zookeeperhigh-availability.storageDirJobManager的元数据持久化保存的位置hdfs:///flink/ha/high-availability.zookeeper.quorum配置独立的ZooKeeper集群master:2181,slave1:2181,slave2:2181rest.portWeb运行端口号8081historyserver.web.port基于Web的HistoryServer的端口号80829.4.7Flink接口1.FlinkWebUIFlinkWebUI主要面向管理员,从该页面上,管理员可以看到正在执行的和已完成的所有Flink应用程序信息,该页面只支持读,不支持写操作,主要包括概览(Overview)、作业(Jobs)、从节点(TaskManagers)、主节点(JobManager)、提交新作业(SubmitNewJob)五个页面,FlinkWebUI地址默认为http://JobManagerIP:8081。1)概览页面(Overview)概览页面显示槽(TaskSlot)数量、从节点TaskManager数量、正在运行作业数量、正在运行作业信息列表等,效果如图9-6所示。图9-6FlinkWebUI之概览页面2)作业(Jobs)页面作业(Jobs)页面分为正在运行作业、已完成作业两个页面,如图9-7所示,显示了运行完毕的Flink示例Java程序WordCount.jar的状态信息。图9-7FlinkWebUI之作业(Jobs)的已完成作业页面3)从节点(TaskManagers)页面从节点(TaskManagers)的概览页面如图9-8所示。从图9-8中可以看出,该Flink集群共计有2个TaskManager,每个TaskManager上均有两个TaskSlot,JVM堆内存(JVMHeap)大小为384MB,Flink管理内存(FlinkManagedMemory)为410MB。图9-8FlinkWebUI之TaskManagers的概览页面TaskManagers中节点slave1的日志(Logs)页面如图9-9所示。从图9-9中可以看出TaskManager各内存区域的分配大小,配置文件flink-conf.yam将TaskManager的Flink总内存(TotalFlinkMemory)参数“taskmanager.memory.flink.size”设置为1024MB,Flink总内存(TotalFlinkMemory)=JVM堆内存(JVMHeapMemory)+JVM堆外内存(JVMOff-HeapMemory),JVM进程总内存(TotalProcessMemory)=Flink总内存(TotalFlinkMemory)+JVM元空间(JVMMetaspace)+JVM运行时开销(JVMOverhead)。图9-9FlinkWebUI之TaskManagers中slave1的日志(Logs)页面——TaskManager各内存区域分配FlinkTaskManager内存分区图如图9-10所示,相比JobManager的内存分区而言,TaskManager的内存划分比较复杂。图9-10FlinkTaskManager内存分区图4)主节点(JobManager)页面主节点(JobManager)分为配置(Configuration)、日志(Logs)、标准输出(Stdout)和日志列表(Loglist)四个页面,其中配置(Configuration)标签页如图9-11所示,从图9-11可以看出上文flink-conf.yaml的配置结果。图9-11FlinkWebUI之JobManager的配置(Configuration)页面主节点(JobManager)的日志(Logs)页面显示主节点日志信息,如图9-12所示。从图9-12中可以看出JobManager各内存区域的分配大小,9.8.2节中配置文件flink-conf.yam将JobManager的Flink总内存(TotalFlinkMemory)参数“jobmanager.memory.flink.size”设置为2048MB,Flink总内存(TotalFlinkMemory)=JVM堆内存(JVMHeapMemory)+JVM堆外内存(JVMOff-HeapMemory),JVM进程总内存(TotalProcessMemory)=Flink总内存(TotalFlinkMemory)+JVM元空间(JVMMetaspace)+JVM运行时开销(JVMOverhead)。图9-12FlinkWebUI之JobManager的日志(Logs)页面——JobManager各内存区域分配FlinkJobManager内存分区图如图9-13所示,相比TaskManager的内存分区而言,JobManager的内存划分显得相当简单,有JVM进程总内存(TotalProcessMemory)、Flink总内存(TotalFlinkMemory)、堆内存(JVMHeapMemory)、堆外内存(JVMOff-HeapMemory)、JVM元空间(JVMMetaspace)、JVM运行时开销(JVMOverhead),不再区分框架区和用户区,也没有托管内存、网络缓存等其他区域。图9-13FlinkJobManager内存分区图2.FlinkShell通过FlinkShell,用户能够向Flink集群提交Flink应用程序、查看正在运行的Flink应用程序情况;还可以通过ScalaShell接口使用Scala语言进行交互式编程。FlinkShell命令位于$FLINK_HOME/bin目录下,如图9-14所示。图9-14FlinkShell命令几个常用的FlinkShell功能说明如表9-3所示。表9-3几个常用的FlinkShell功能说明FlinkShell命令功能start-cluster.sh启动Flink集群stop-cluster.sh关闭Flink集群start-scala-shell.sh启动ScalaShell,使用Scala语言进行交互式的Flink编程flink向Flink集群提交Flink应用程序,查看正在运行的Flink应用程序情况等表9-3中,“start-scala-shell.sh”脚本可以启动ScalaShell,用户可以使用Scala语言进行Flink交互式编程,即用户输入一条语句,ScalaShell会立即执行语句并返回结果,这就是“ScalaREPL”可以即时查看中间结果并对程序进行修改,提升程序开发效率,ScalaShell主要用于开发调试Flink程序。目前,Flink提供了三种ScalaShell模式:Local、RemoteCluster、YARNCluster,例如以Local模式启动ScalaShell的命令如下所示。[xuluhui@masterflink-1.11.6]$./bin/start-scala-shell.shlocalFlink也提供有PythonShell,即使用Python语言进行Flink交互式编程,读者可以首先通过命令“$python-mpipinstallapache-flink”安装PyFlink,然后通过命令“$pyflink-shell.shlocal”启动PythonShell。表9-3中,“flink”命令用于运行、查看、停止Flink应用程序等,读者可以通过“./bin/flink--help”命令查看其完整帮助文档,其语法格式如下所示:flink<ACTION>[OPTIONS][ARGUMENTS]“flink”命令的使用方法简述如下。(1)flinkrun[OPTIONS]<jar-file><arguments>:编译和运行Flink程序。(2)flinkinfo[OPTIONS]<jar-file><arguments>:显示Flink程序的优化执行计划。(3)flinklist[OPTIONS]:显示正在运行或调度的Flink程序情况。(4)flinkstop[OPTIONS]<JobID>:正常停止Flink程序,同时还会创建一个保存点以再次开始。更优雅地停止正在运行的Flink流作业的方式,仅适用于source实现了StoppableFunction接口的作业。(5)flinkcancel[OPTIONS]<JobID>:取消Flink程序,非优雅地停止作业,相应作业的状态将从“正在运行”转换为“已取消”,任何计算都将停止。(6)flinksavepoint[OPTIONS]<JobID>[<targetdirectory>]:为给定作业创建或释放保存点。例如,下述命令用于运行Flink样例程序WordCount.jar。[xuluhui@masterflink-1.11.6]$./bin/flinkrun./examples/batch/WordCount.jar3.FlinkAPIFlinkAPI面向Java、Scala、Python等编程语言,开发者可以通过这些接口编写Flink应用程序。具体的API接口请参考官方文档,以Flink1.11.6版本为例,各网址如表9-4所示。表9-4SparkAPI官方参考网址APIDocs网址FlinkJavaAPI(Javadocs)/projects/flink/flink-docs-release-1.11/api/javaFlinkScalaAPI(Scaladocs)/projects/flink/flink-docs-release-1.11/api/scala/FlinkPythonAPI(Pythondocs)/flink/flink-docs-release-1.11/api/python/9.5实验步骤9.5.1规划FlinkStandalone集群1.FlinkStandalone集群架构规划本实验选用FlinkStandalone集群模式,关于第三方集群资源管理器如YARN、Mesos、Kubernetes下的Flink集群部署读者可参考其它资料自行实践,本书不做叙述。受所用硬件资源限制,本实验的Flink集群欲使用三台安装有Linux操作系统的虚拟机器,机器名分别为master、slave1、slave2。编者做出如下规划:master机器充当主节点,部署主服务JobManager进程;slave1和slave2机器充当从节点,部署从服务TaskManager进程;受节点数量限制,编者同时将master机器作为向集群提交Flink应用程序的客户端使用。另外,本书的Flink集群直接使用HDFS作为分布式底层存储系统,所以也需要搭建好Hadoop集群,并启动HDFS相关进程。具体部署规划如表9-5所示。表9-5FlinkStandalone集群模式部署规划表主机名IP地址运行服务软硬件配置master30NameNode(HDFS主进程)SecondaryNameNode(与NameNode协同工作)JobManager(Flink主进程)内存:4GCPU:1个2核硬盘:40G操作系统:CentOS7.6.1810Java:OracleJDK8u191Hadoop:Hadoop2.9.2Flink:Flink1.11.6Maven:Maven3.6.3slave131DataNode(HDFS从进程)TaskManager(Flink从进程)内存:1GCPU:1个1核硬盘:20G操作系统:CentOS7.6.1810Java:OracleJDK8u191Hadoop:Hadoop2.9.2Flink:Flink1.11.6slave232DataNode(HDFS从进程)TaskManager(Flink从进程)内存:1GCPU:1个1核硬盘:20G操作系统:CentOS7.6.1810Java:OracleJDK8u191Hadoop:Hadoop2.9.2Flink:Flink1.11.62.软件选择本实验部署Flink集群所使用各种软件的名称、版本、发布日期及下载地址如表9-6所示。表9-6本实验部署Flink集群使用的软件名称、版本、发布日期及下载地址软件名称软件版本发布日期下载地址VMwareWorkstationProVMwareWorkstation14.5.7ProforWindows2017年6月22日/products/workstation-pro.htmlCentOSCentOS7.6.18102018年11月26日/download/JavaOracleJDK8u1912018年10月16日/technetwork/java/javase/downloads/index.htmlHadoopHadoop2.9.22018年11月19日/releases.htmlFlinkApacheFlink1.11.6forScala2.122021年12月16日/downloads.htmlMavenMaven3.6.32019年11月25日/download.cgi注意,本节采用的是Flink版本是1.11.6,三个节点的机器名分别为master、slave1、slave2,IP地址依次为30、31、32,后续内容均在表9-5规划基础上完成,读者务必与之对照确认自己的Flink版本、机器名等信息。9.5.2部署FlinkStandalone集群本节采用的Flink版本是1.11.6,因此本节的讲解都是针对这个版本进行的。尽管如此,由于Flink各个版本在部署和运行方式上的变化不大,因此本节的大部分内容都适用于Flink其它版本。1.初始软硬件环境准备(1)准备三台机器,安装操作系统,编者使用CentOS7.6.1810。(2)对集群内每一台机器,配置静态IP、修改机器名、添加集群级别域名映射、关闭防火墙。(3)对集群内每一台机器,安装和配置Java,要求Java8或更高版本,编者使用OracleJDK8u191。(4)安装和配置Linux集群中各节点间的SSH免密登录。(5)在Linux集群上部署全分布模式Hadoop集群(可选,若Flink底层存储采用HDFS,或资源管理器采用YARN,则需要部署Hadoop集群)。以上步骤已在本书实验1中详细介绍,此处不再赘述。2.获取FlinkFlink官方下载地址为/downloads.html,建议读者到官网下载稳定版,编者采用2021年12月16日发布的稳定版ApacheFlink1.11.6forScala2.12,安装包名称为“flink-1.11.6-bin-scala_2.12.tgz”,例如存放在master机器的/home/xuluhui/Downloads中。3.主节点上安装Flink并设置属主在master机器上,使用root用户解压flink-1.11.6-bin-scala_2.12.tgz到安装目录如/usr/local下,依次使用的命令如下所示。[xuluhui@master~]$suroot[root@masterxuluhui]#cd/usr/local[root@masterlocal]#tar-zxvf/home/xuluhui/Downloads/flink-1.11.6-bin-scala_2.12.tgz为了在普通用户下使用Flink集群,将Flink安装目录的属主设置为Linux普通用户例如xuluhui,使用以下命令完成。[root@masterlocal]#chown-Rxuluhui/usr/local/flink-1.11.64.主节点上配置FlinkFlink配置文件数量不多,都存放在目录${FLINK_HOME}/conf下,具体的配置文件如上文图8-10所示。这里编者仅修改flink-conf.yaml、masters和workers共3个配置文件,以部署FlinkStandalone集群。假设当前目录为“/usr/local/flink-1.11.6”,切换到普通用户如xuluhui下,在主节点master上配置Spark的具体过程如下所示。1)修改文件flink-conf.yaml使用命令“vimconf/flink-conf.yaml”编辑配置文件flink-conf.yaml,编者具体修改内容如下所示。#修改JobManager的IP地址或主机名,按上文规划,本案例将机器名为“master”节点作为Flink主节点,因此将原始值“localhost”修改为“master”jobmanager.rpc.address:master#注释掉以下2行#cess.size:1600m#cess.size:1600m#删除以下行的注释,并将值设置为1024mtaskmanager.memory.flink.size:1024m#增加1行,设置jobmanager.memory.flink.size值为2048mjobmanager.memory.flink.size:2048m#修改每个TaskManager提供的TaskSlot数量,本案例将原始值“1”修改“2”taskmanager.numberOfTaskSlots:2#设置并行度,本案例将原始值“1”修改“2”parallelism.default:2#设置Flink临时数据保存目录,本案例增加参数“io.tmp.dirs”,并将其值设置为“/usr/local/flink-1.11.6/tmp”io.tmp.dirs:/usr/local/flink-1.11.6/tmp读者需要注意的是,flink-conf.yaml文件中参数和值中间的冒号“:”后必须加上1个空格,否则出错。上述参数中,参数io.tmp.dirs用于指定Flink临时数据保存目录,默认为“/tmp”,由于该文件夹用以存放临时文件,系统定时会自动清理,因此本书将“io.tmp.dirs”设置为“/usr/local/flink-1.11.6/tmp”,其中目录tmp不会自动创建,需要手工创建。2)修改文件masters使用命令“vimconf/masters”编辑配置文件masters,按上文规划,指定Flink集群的主节点为机器“master”,Web端口号为“8081”。将原始内容“localhost:8081”替换为如下内容:master:80813)修改文件slaves使用命令“vimconf/workers”编辑配置文件workers,按上文规划,指定Flink集群的从节点为机器“slave1”、“slave2”,要求一行一个主机名,将原始内容“localhost”替换为如下内容:slave1slave25.创建tmp目录上一步修改文件flink-conf.yaml中,设置了Flink临时数据保存目录“/usr/local/flink-1.11.6/tmp”,Flink不会自动创建该目录,需要手工创建,使用以下命令完成。[xuluhui@masterflink-1.11.6]$mkdirtmp6.配置与Hadoop集成如果Flink要与Hadoop一起使用,例如在YARN上运行Flink、Flink连接到HDFS、Flink连接到HBase等,需要配置Flink与Hadoop的集成。Flink1.11版本前,用户可通过下载Flink对应Hadoop版本的shaded包例如“flink-shaded-hadoop-2-uber-2.8.3-10.0.jar”并将其放在${FLINK_HOME}/lib目录下解决。但是从Flink1.11开始,Flink项目不再支持flink-shaded-hadoop-2-uber发行版,官网建议通过“HADOOP_CLASSPATH”提供Hadoop依赖关系。编者采用下载flink-shaded-hadoop-2-uber-2.8.3-10.0.jar并放在${FLINK_HOME}/lib下解决此问题,下载链接为/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar。7.同步Flink安装文件至所有从节点并设置属主与Hadoop集群相同,由于Flink集群所有节点配置相同,因此将master机器上已经安装且配置好的Flink安装文件全部同步到所有从节点中。切换到root下,使用“scp”命令将master机器中目录“/usr/local/flink-1.11.6”及其子目录和文件全部拷贝至所有从节点slave1和slave2上,注意需要按照提示输入“root@slave1'spassword:”和“root@slave2'spassword:”的密码,依次使用的命令如下所示。[root@masterflink-1.11.6]#scp-r/usr/local/flink-1.11.6root@slave1:/usr/local/flink-1.11.6[root@masterflink-1.11.6]#scp-r/usr/local/flink-1.11.6root@slave2:/usr/local/flink-1.11.6然后,依次将所有从节点slave1、slave2上的Flink安装目录的属主也设置为Linux普通用户例如xuluhui,如在slave1机器上使用以下命令完成。[root@slave1xuluhui]#chown-Rxuluhui/usr/local/flink-1.11.6至此,Linux集群中各个节点的Flink均已安装和配置完毕。9.5.3启动FlinkStandalone集群在master机器上使用命令“start-cluster.sh”启动Flink集群,具体效果如图9-15所示。图9-15使用命令“start-cluster.sh”启动Flink集群过程应当注意的是,此命令只能在master机器上执行,不可以在其他slave机器和客户端机器上执行。9.5.4验证FlinkStandalone集群启动Flink集群后,可通过以下三种方法验证Flink集群是否成功部署。1.验证进程(方法1)在Flink集群启动后,若集群部署成功,通过“jps”命令在master机器上可以看到Flink主进程“StandaloneSessionClusterEntrypoint”,在slave1、slave2机器上可以看到Flink从进程“TaskManagerRunner”,具体效果如图9-16所示。图9-16验证Spark进程2.验证FlinkWebUI(方法2)在Flink集群启动后,若集群部署成功,可以通过浏览器输入地址http://MasterIP:8081即可看到FlinkWebUI,效果如前文图9-6所示。3.验证提交Flink样例程序(方法3)在Flink集群启动后,若集群部署成功,还可以通过FlinkShell命令“flinkrun”向Flink集群提交Flink应用程序。接下来以Flink样例程序WordCount为例,介绍Flink应用程序的提交、运行和查看结果详细过程,WordCount的jar包位于/usr/local/flink-1.11.6/examples/batch/WordCount.jar。(1)不指定输入、输出文件,使用的命令如下所示,运行过程及结果如图9-17所示。[xuluhui@masterflink-1.11.6]$./bin/flinkrun./examples/batch/WordCount.jar图9-17使用命令“flinkrun”向Flink集群提交Flink示例程序WordCount.jar(不指定输入、输出)(2)指定输入、输出文件,此处为HDFS文件,使用的命令如下所示,其中输入HDFS文件“/InputData/file1.txt”已存在,文件内容与【案例2-1】相同,且输出HDFS文件“/flinkWordCountOutput”不存在,运行过程及结果如图9-18所示。[xuluhui@masterflink-1.11.6]$./bin/flinkrun./examples/batch/WordCount.jar--inputhdfs://master:9000/InputData/file1.txt--outputhdfs://master:9000/flinkWordCountOutput图9-18使用命令“flinkrun”向Flink集群提交Flink示例程序WordCount.jar(指定HDFS输入、输出)接着,使用HDFSShell命令查看词频统计结果,依次使用的命令如图9-19所示,之所以指定的HDFS文件“/flinkWordCountOutput”实际生成为目录,且其下有2个结果文件“1”和“2”,是因为上文在配置Flink时已将并行度设置为2。图9-19HDFSShell命令查看Flink示例程序WordCount.jar运行结果上述Flink作业“81b253dc68a5df3c7a09f32b75bb3646”执行过程情况可以通过FlinkWebUI查看,如图9-20所示,从图9-22可以看出,其DataSource模块、Transformation模块、DataSink模块并行度为2,各占用2个TaskSlot,共计使用6个TaskSlot。图9-20Flink示例程序WordCount.jar(指定HDFS输入、输出)的WebUI运行概览页面当然,我们也可以通过选项参数“--parallelism”将并行度设置为1,使用的命令如下所示,此时输出HDFS“/flinkWordCountOutput2”不是目录而是文件,运行过程及结果如图9-21所示。[xuluhui@masterflink-1.11.6]$./bin/flinkrun--parallelism1./examples/batch/WordCount.jar--inputhdfs://master:9000/InputData/file1.txt--outputhdfs://master:9000/flinkWordCountOutput2图9-21Flink示例程序WordCount.jar(指定HDFS输入、输出)指定选项参数“—parallelism1”运行过程及结果上述Flink作业“1fec654bde93b05efd2790e1b311707d”执行过程情况可以通过FlinkWebUI查看,如图9-22所示,从图9-22可以看出,其DataSource模块、Transformation模块、DataSink模块并行度为1,各占用1个TaskSlot,共计使用3个TaskSlot。图9-22Flink示例程序WordCount.jar(指定HDFS输入、输出)指定选项参数“--parallelism1”的WebUI运行概览页面Flink示例程序WordCount.jar设置并行度为1和2运行完毕后FlinkWebUI效果对比如图9-23所示。图9-23Flink示例程序WordCount.jar设置并行度为1和2运行完毕后FlinkWebUI效果9.5.5开发Flink独立应用程序本节介绍如何开发Flink独立应用程序,借助文本编辑器vim使用Scala语言编写Flink程序,然后使用Maven编译打包程序,最后使用“flinkrun”命令运行Flink程序。CentOS7中没有自带安装Maven,需要手工安装Maven,步骤如下所示。(1)下载Maven读者可以访问Maven官网/maven下载安装文件,编者下载的是文件apache-maven-3.6.3-bin.tar.gz。(2)安装Maven将Maven安装到/usr/local目录下,依次使用的命令如下所示。[root@masterxuluhui]#cd/usr/local[root@masterlocal]#tar-zxvf/home/xuluhui/Downloads/apache-maven-3.6.3-bin.tar.gz[root@masterlocal]#chown-Rxuluhui/usr/local/apache-maven-3.6.3(3)修改配置文件settings.xml在使用Maven打包Scala程序时,默认从国外Maven中央仓库下载相关依赖文件,造成从国内下载速度很慢,为了提高下载速度,编者修改了Maven的配置文件settings.xml,使Maven到国内的阿里云仓库下载相关依赖文件,配置文件settings.xml完整内容如下所示。<settingsxmlns="/SETTINGS/1.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/SETTINGS/1.0.0/xsd/settings-1.0.0.xsd"><mirrors><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云公共仓库</name><url>/repository/public</url></mirror><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云谷歌仓库</name><url>/repository/google</url></mirror><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云阿帕奇仓库</name><url>/repository/apache-snapshots</url></mirror><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云spring仓库</name><url>/repository/spring</url></mirror><mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云spring插件仓库</name><url>/repository/spring-plugin</url></mirror></mirrors></settings>【案例9-1】使用DataSetAPI采用Scala语言编写Flink批处理程序,实现对内容为英文字符的HDFS文件的数据读取,统计单词词频,并将处理结果输出到HDFS文件中。(1)编写代码。首先,创建Flink应用程序目录/usr/local/flink-1.11.6/flinkBatchWordCount/src/main/scala,在Linux终端中使用如下命令完成。[xuluhui@masterflink-1.11.6]$mkdir-p./flinkBatchWordCount/src/main/scala其次,使用vim编辑器在./flinkBatchWordCount/src/main/scala目录下新建代码文件flinkBatchWordCount.scala,源代码如下所示。packagecom.xijing.flinkimportorg.apache.flink.api.scala._objectflinkBatchWordCount{defmain(args:Array[String]):Unit={//第1步:建立执行环境valenv=ExecutionEnvironment.getExecutionEnvironment//第2步:创建数据源valtext=env.readTextFile("hdfs://master:9000/InputData/file1.txt")//第3步:对数据集指定转换操作valcounts=text.flatMap{_.toLowerCase.split("")}.map{(_,1)}.groupBy(0).sum(1)//第4步:输出结果counts.writeAsText("hdfs://master:9000/flinkBatchWordCountOutput")//当DataSet数据输出为文件时,必须调用execute()方法,否则无法将数据集输出到文件env.execute()}}最后,编写pom.xml文件,用来声明该独立应用程序的信息以及与Flink的依赖关系。由于该程序依赖FlinkScalaAPI,因此需要通过Maven进行编译打包,在应用程序根目录./flinkBatchWordCount下新建文件pom.xml,具体内容如下所示。由于本案例使用HDFS文件作为数据源DataSource和数据输出DataSink,故需要在pom.xml中添加与访问HDFS相关的依赖包hadoop-common和hadoop-client。<project><groupId>com.xijing.flink</groupId><artifactId>flinkBatchWordCount</artifactId><modelVersion>4.0.0</modelVersion><name>flinkBatchWordCount</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyunmaven</name><url>/nexus/content/groups/public/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId>
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年修订版租赁协议
- 2024年光纤接入系统施工合同
- 2024年信用借款分期还款合同
- 2024年二手车卖场租赁合同
- 2024年创意保护协议
- 2024年公共场所消防安全设施安装合同
- 2024年产业升级贷款合同范本
- 2024冷冻仓储服务合同
- 2024年创业项目投融资合同
- 2024年合肥工业大学食堂餐饮服务承包合同
- CA码生成原理及matlab程序实现
- 国家开放大学《电气传动与调速系统》章节测试参考答案
- 须弥(短篇小说)
- 旋风除尘器设计与计算
- 《装配基础知识培训》
- 出口退税的具体计算方法及出口报价技巧
- PCB镀层与SMT焊接
- Unit 1 This is my new friend. Lesson 5 课件
- 2019年青年英才培养计划项目申报表
- 芳香油的提取
- 企业人才测评发展中心建设方案
评论
0/150
提交评论