Flink实时大数据处理技术 课件 第7、8章 处理函数与状态管理、Table API和SQL_第1页
Flink实时大数据处理技术 课件 第7、8章 处理函数与状态管理、Table API和SQL_第2页
Flink实时大数据处理技术 课件 第7、8章 处理函数与状态管理、Table API和SQL_第3页
Flink实时大数据处理技术 课件 第7、8章 处理函数与状态管理、Table API和SQL_第4页
Flink实时大数据处理技术 课件 第7、8章 处理函数与状态管理、Table API和SQL_第5页
已阅读5页,还剩99页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第六章时间与窗口Flink实时大数据处理技术什么是有状态的计算有状态计算的潜在场景数据去重:需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入数据去重检查输入流是否符合某个特定模式:之前流入的数据以状态的形式缓存下来对一个窗口内的数据进行聚合分析,比如分析一小时内某项指标75分位值或99分位值Flink分布式计算,一个算子有多个算子子任务状态可以被理解为某个算子子任务在当前实例上的一个变量,变量记录了数据流的历史信息,新数据流入,可以结合历史信息来进行计算接收输入流/获取对应状态/更新状态状态管理的难点要解决问题:实时性,延迟不能太高数据不丢不重、恰好计算一次,尤其发生故障恢复后程序的可靠性要高,保证7*24小时稳定运行难点不能将状态直接交由内存,因为内存空间有限用持久化的系统备份状态,出现故障时,如何从备份中恢复需要考虑扩展到多个节点时的伸缩性Flink解决了上述问题,提供有状态的计算APIManaged

State和Raw

State托管状态(ManagedState)是由Flink管理的,Flink帮忙存储、恢复和优化原生状态(RawState)是开发者自己管理的,需要自己序列化Managed

State又细分为Keyed

State和Operator

StateFlink的几种状态类型

ManagedStateRawState状态管理方式FlinkRuntime托管,自动存储、自动恢复、自动伸缩用户自己管理状态数据结构Flink提供的常用数据结构,如ListState、MapState等字节数组:byte[]使用场景绝大多数Flink函数用户自定义函数Keyed

State是KeyedStream上的状态,每个Key共享一个状态OperatorState每个算子子任务共享一个状态Keyed

State和Operator

StateKeyed

State相同Key的数据可以访问、更新这个状态Operator

State流入这个算子子任务的所有数据可以访问、更新这个状态Keyed

State和Operator

State都是基于本地的,每个算子子任务维护着自身的状态,不能访问其他算子子任务的状态具体的实现层面,Keyed

State需要重写Rich

Function函数类,Operator

State需要实现CheckpointedFunction等接口Keyed

State和Operator

State

KeyedStateOperatorState适用算子类型只适用于KeyedStream上的算子可以用于所有算子状态分配每个Key对应一个状态一个算子子任务对应一个状态创建和访问方式重写RichFunction,通过里面的RuntimeContext访问实现CheckpointedFunction等接口横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配的方式支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等修改Flink应用的并行度:每个算子的并行算子子任务数发生了变化,整个应用需要关停和启动一些算子子任务某份在原来某个算子子任务的状态需要平滑更新到新的算子子任务上Flink的Checkpoint可以辅助状态数据在算子子任务之间迁移算子子任务生成快照(Snapshot)保存到分布式存储上子任务重启后,相应的状态在分布式存储上重建(Restore)Keyed

State与Operator

State的横向扩展方式稍有不同横向扩展问题Flink提供了封装好的数据结构供我们使用,包括ValueState、ListState等主要有:ValueState:单值MapState:Key-Value对ListState:列表ReducingState和AggregatingState:合并Keyed

State由于跟Key绑定,Key自动分布到不同算子子任务,Keyed

State也可以根据Key分发到不同算子子任务上Keyed

State实现RichFunction函数类,比如RichFlatMapFunction创建StateDescriptor,StateDescriptor描述状态的名字和状态的数据结构,每种类型的状态有对应的StateDescriptor通过StateDescriptor,从RuntimeContext中获取状态调用状态提供的方法,获取状态,更新数据Keyed

State//创建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通过StateDescriptor获取运行时上下文中的状态

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);MapState<UK,UV>:UVget(UKkey)voidput(UKkey,UVvalue)booleancontains(UKkey)…案例:统计电商用户行为UserBehavior场景下,某个用户(userId)下某种用户行为(behavior)的数量Keyed

State/**

*MapStateFunction继承并实现RichFlatMapFunction*两个泛型分别为输入数据类型和输出数据类型*/

publicstaticclass

MapStateFunction

extends

RichFlatMapFunction<UserBehavior,Tuple3<Long,String,Integer>>{//指向MapState的句柄

privateMapState<String,Integer>behaviorMapState;@Overridepublicvoidopen(Configurationconfiguration){//创建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通过StateDescriptor获取运行时上下文中的状态

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);}@OverridepublicvoidflatMap(UserBehaviorinput,Collector<Tuple3<Long,String,Integer>>out)throwsException{intbehaviorCnt=1;//behavior有可能为pv、cart、fav、buy等

//判断状态中是否有该behavior

if(behaviorMapState.contains(input.behavior)){behaviorCnt=behaviorMapState.get(input.behavior)+1;}//更新状态

behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt));}}使用MapState记录某个behavior下的数量<behavior,

behaviorCnt>UserBehavior案例先基于userId进行keyBy再使用有状态的MapStateFunction进行处理Keyed

Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorStream=...//生成一个KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上进行flatMap

DataStream<Tuple3<Long,String,Integer>>behaviorCountStream=keyedStream.flatMap(newMapStateFunction());状态:算子子任务的本地数据在Checkpoint过程时写入存储,这个过程被称为备份(Snapshot)初始化或重启一个Flink作业时,以一定逻辑从存储中读出并变为算子子任务的本地数据,这个过程被称为重建(Restore)Keyed

State开箱即用:数据划分基于Key,Snapshot和Restore过程可以基于Key在多个算子子任务之间做数据迁移Operator

State每个算子子任务管理自己的状态,流入到这个算子子任务上的所有数据可以访问和修改Operator

State故障重启后,数据流中某个元素不一定流入重启前的算子子任务上需要根据具体业务场景设计Snapshot和Restore的逻辑使用CheckpointedFunction接口类Operator

StateFlink定期执行Checkpoint,会将状态数据Snapshot到存储上每次执行Snapshot,会调用snapshotState()方法,因此我们要实现一些Snapshot逻辑,比如将哪些状态持久化initializeState()在算子子任务初始化状态时调用,有两种被调用的可能:整个Flink作业第一次执行,状态数据需要初始化一个默认值Flink作业遇到故障重启,基于之前已经持久化的状态恢复ListState

/

UnionListStateBroadcastStateOperator

Statepublic

interface

CheckpointedFunction{//Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化

void

snapshotState(FunctionSnapshotContextcontext)

throwsException;//初始化时会调用这个方法,向本地状态中填充数据

void

initializeState(FunctionInitializationContextcontext)

throwsException;}CheckpointedFunction源码状态以列表的形式序列化并存储单个状态为S,每个算子子任务有零到多个状态,共同组成一个列表ListState[S],Snapshot时将这些状态以列表形式写入存储包含所有状态的大列表,当作业重启时,将这个大列表重新分布到各个算子子任务上ListState:将大列表按照Round-Ribon模式均匀分布到各个算子子任务上,每个算子子任务得到的是大列表的子集UnionListState:将大列表广播给所有算子子任务应用场景:Source上保存流入数据的偏移量,Sink上对输出数据做缓存Operator

State

ListState、UnionListStateOperator

State使用方法重点实现snapshotState()和initializeState()两个方法在initializeState()方法里初始化并获取状态注册StateDescriptor,指定状态名字和数据类型从FunctionInitializationContext中获取OperatorStateStore,进而获取Operator

State在snapshotState()方法里实现一些业务逻辑基于ListState实现可缓存的Sink//重写CheckpointedFunction中的snapshotState()

//将本地缓存Snapshot到存储上

@OverridepublicvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{//将之前的Checkpoint清理

checkpointedState.clear();for(Tuple2<String,Integer>element:bufferedElements){//将最新的数据写到状态中

checkpointedState.add(element);}}//重写CheckpointedFunction中的initializeState()

//初始化状态

@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{//注册ListStateDescriptor

ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));//从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState

checkpointedState=context.getOperatorStateStore().getListState(descriptor);//如果是作业重启,读取存储中的状态数据并填充到本地缓存中

if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}Sink先将数据放在本地缓存中,并定期通过snapshotState()方法进行SnapshotinitializeState()初始化状态,需判断是新作业还是重启作业snapshotState()initializeState()Broadcast可以将部分数据同步到所有实例上使用场景:一个主数据流,一个控制规则流,主数据流比较大,只能分散在多个算子实例上,控制规则流数据比较小,可以广播分发到所有算子实例上。与Join的区别:控制规则流较小,可以放到每个算子实例里电商用户行为分析案例:识别用户行为模式,行为模式包括“反复犹豫下单类”、“频繁爬取数据类”等,控制流里包含了这些行为模式,使用Flink实时识别用户Broadcast

State主逻辑中读取两个数据流Broadcast

State支持Key-Value形式,需要使用MapStateDescriptor来构建再使用broadcast()方法将数据广播到所有算子子任务上,得到BroadcastStream主数据流先进行keyBy(),然后与广播流合并,在KeyedBroadcastProcessFunction中实现具体业务逻辑BroadcastPatternFunction是KeyedBroadcastProcessFunction的具体实现Broadcast

State//主数据流

DataStream<UserBehavior>userBehaviorStream=...//BehaviorPattern数据流

DataStream<BehaviorPattern>patternStream=...//BroadcastState只能使用Key->Value结构,基于MapStateDescriptor

MapStateDescriptor<Void,BehaviorPattern>broadcastStateDescriptor=newMapStateDescriptor<>("behaviorPattern",Types.VOID,Types.POJO(BehaviorPattern.class));BroadcastStream<BehaviorPattern>broadcastStream=patternStream.broadcast(broadcastStateDescriptor);//生成一个KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上进行connect()和process()

DataStream<Tuple2<Long,BehaviorPattern>>matchedStream=keyedStream.connect(broadcastStream).process(newBroadcastPatternFunction());processElement()方法处理主数据流中的每条元素,输出零到多个数据processBroadcastElement()方法处理广播流,可以输出零到多个数据,一般用来更新BroadcastStateKeyedBroadcastProcessFunction属于ProcessFunction系列函数,可以注册Timer,并在onTimer方法中实现回调逻辑。KeyedBroadcastProcessFunctionFlink的状态是基于本地的,本地状态数据不可靠Checkpoint机制:Flink定期将状态数据保存到存储上,故障发生后将状态数据恢复。快照(Snapshot)、分布式快照(DistributedSnapshot)和检查点(Checkpoint)均指的是Flink将状态写入存储的过程一个简单的Checkpoint流程:暂停处理新流入数据,将新数据缓存下来将算子子任务的本地状态数据拷贝到一个远程的持久化存储上继续处理新流入的数据,包括刚才缓存起来的数据Checkpoint机制检查点分界线(CheckpointBarrier)被插入到数据流中,将数据流切分成段。Flink的算子接收到CheckpointBarrier后,对状态进行快照。每个CheckpointBarrier有一个ID,表示该段数据属于哪次Checkpoint。当ID为n的CheckpointBarrier到达每个算子后,表示要对n-1和n之间状态更新做快照。Checkpoint

Barrier构建并行度为2的数据流图Flink的检查点协调器(CheckpointCoordinator)触发一次Checkpoint(TriggerCheckpoint),这个请求会发送给Source的各个子任务。分布式快照流程各Source算子子任务接收到这个Checkpoint请求之后,会将自己的状态写入到状态后端,生成一次快照向下游广播CheckpointBarrier分布式快照流程Source算子做完快照后,还会给CheckpointCoodinator发送一个确认(ACK)ACK中包括了一些元数据,包括备份到State

Backend的状态句柄(指向状态的指针)Source算子完成了一次Checkpoint分布式快照流程对于下游算子来说,可能有多个与之相连的上游输入。一个输入被称为一条通道。Id为n的Checkpoint

Barrier会被广播到多个通道。不同通道的Checkpoint

Barrier传播速度不同。需要进行对齐(BarrierAlignment)对齐分四步:1算子子任务在某个输入通道中收到第一个ID为n的CheckpointBarrier,其他输入通道中ID为n的CheckpointBarrier还未到达。2算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。3第二个输入通道的CheckpointBarrier抵达该算子子任务,该算子子任务执行快照,将状态写入StateBackend,然后将ID为n的CheckpointBarrier向下游所有输出通道广播。4对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。Checkpoint

Barrier对齐每个算子都要执行一遍上述的对齐、快照、确认的工作最后的Sink算子发送确认后,说明ID为n的Checkpoint执行结束,CheckpointCoordinator向StateBackend写入一些本次Checkpoint的元数据Checkpoint完成CheckpointBarrier对齐时,必须等待所有上游通道都处理完。假如某个上游通道处理很慢,这可能造成整个数据流堵塞。一个算子子任务不需要等待所有上游通道的CheckpointBarrier,直接将CheckpointBarrier广播,算子子任务直接执行快照并继续处理后续流入数据。Flink必须将那些上下游正在传输的数据也作为状态保存到快照中。开启Unaligned

Checkpoint:Unaligned

Checkpoint优缺点:不需要对齐,Checkpoint速度快传输数据也要快照,状态数据大,磁盘负载加重,重启后状态恢复时间过长,运维管理难度大Unaligned

Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();

每次执行数据快照时,不需要暂停新流入数据。Flink启动一个后台线程,它创建本地状态的一份复制,这个线程用来将本地状态的复制同步到StateBackend上,一旦数据同步完成,再给CheckpointCoordinator发送确认信息。该过程被称为异步快照(AsynchronousSnapshot)。利用写入时复制(Copy-on-Write):如果这份内存数据没有任何修改,那没必要生成一份复制,如果这份内存数据有一些更新,那再去申请额外的内存空间并维护两份数据,一份是快照时的数据,一份是更新后的数据。是否开启异步快照可配置。异步快照State

Backend用来持久化状态数据Flink内置三种State

Backend:MemoryStateBackendFsStateBackendRocksDBStateBackendState

Backend基于内存,数据存储在Java的堆区。进行分布式快照时,所有算子子任务会将自己内存上的状态同步到JobManager的堆上,因此一个作业的所有状态要小于JobManager的内存大小,否则将抛出OutOfMemoryError异常。只适合调试或者实验,不建议在生产环境下使用。如果不做其他声明,默认情况是使用这种模式作为StateBackend。设置使用内存作为State

Backend,MAX_MEM_STATE_SIZE为设置的状态的最大值:MemoryStateBackendenv.setStateBackend(newMemoryStateBackend(MAX_MEM_STATE_SIZE));基于文件系统,数据最终持久化到文件系统上文件系统包括本地磁盘、HDFS、Amazon、阿里云等在内的云存储服务,使用时需要提供文件系统的地址,写明前缀:file://、hdfs://或s3://默认开启异步快照本地的状态在TaskManager的堆内存上,执行快照时状态数据会写到文件系统上FsStateBackend//使用HDFS作为StateBackend

env.setStateBackend(newFsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));//使用阿里云OSS作为StateBackend

env.setStateBackend(newFsStateBackend("oss://<your-bucket>/<object-name>"));//使用Amazon作为StateBackend

env.setStateBackend(newFsStateBackend("s3://<your-bucket>/<endpoint>"));//关闭AsynchronousSnapshot

env.setStateBackend(newFsStateBackend(checkpointPath,false));本地状态存储在本地RocksDB上,Checkpoint时将RocksDB数据再写到远程的存储上,因此需要配置一个分布式存储的地址。本地状态基于RocksDB,可以突破内存空间的限制,可存储的状态量更大。但RocksDB需要序列化和反序列化,读写时间成本高。支持增量快照(IncrementalCheckpoint):只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。非常适合超大规模的状态。但重启恢复的时间更长。需要手动开启:RocksDBStateBackend//开启IncrementalCheckpoint

booleanenableIncrementalCheckpointing=true;env.setStateBackend(newRocksDBStateBackend(checkpointPath,enableIncrementalCheckpointing));默认情况下,Checkpoint机制是关闭的,开启:n表示每隔n毫秒进行一次CheckpointCheckpoint耗时可能比较长,n设置过小,有可能出现一次Checkpoint还没完成,下次Checkpoint已经被触发,n设置过大,如果重启,整个作业需要从更长的Offset开始重新处理数据开启Checkpoint,使用Checkpoint

Barrier对齐功能,可以提供Exactly-Once语义At-Least-Once语义:不使用Checkpoint

Barrier对齐功能,但某些数据可能被处理多次一些其他Checkpoint设置,在CheckpointConfig中设置:Checkpoint相关配置env.enableCheckpointing(n)//使用At-Least-OncecheckpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);CheckpointConfigcheckpointCfg=env.getCheckpointConfig();重启恢复流程:重启应用,在集群上重新部署数据流图。从持久化存储上读取最近一次的Checkpoint数据,加载到各算子子任务上。继续处理新流入的数据。作业故障重启,会暂停一段时间,这段时间上游数据仍然会继续发送过来,作业重启后,需要消化这些未处理的数据。重启恢复流程由于异常导致故障,异常根源不消除,重启后仍然出现故障,因此要避免无限次重启。固定延迟(FixedDelay):作业每次失败后,按照设定的时间间隔进行重启尝试,重启次数不会超过某个设定值失败率(FailureRate):计算一个时间段内作业失败的次数,如果失败次数小于设定值,继续重启,否则不重启不重启(NoRestart)在conf/flink-conf.yaml设置或者在代码中设置三种重启策略Checkpoint和Savepoint生成的数据近乎一样Checkpoint目的是为了故障重启,使得重启前后作业状态一致Savepoint目的是手动备份数据,以便进行调试、迁移、迭代等状态数据从零积累成本很高迭代:在初版代码的基础上,保留状态到Savepoint中,方便修改业务逻辑迁移:把程序迁移到新的机房、集群等有计划地备份、停机,手动管理和删除状态数据场景:同一个作业不断调整并行度,以找到最优方案进行A/B实验,使用相同的状态数据测试不同的程序版本Savepoint与Checkpoint的区别每个算子应该分配一个唯一ID,Savepoint中的状态数据以算子ID来存储和区分不设置ID,Flink自动为其分配一个ID算子IDDataStream<X>stream=env.//一个带有OperatorState的Source,例如KafkaSource

.addSource(newStatefulSource()).uid(“source-id”)

//算子ID

.keyBy(...)//一个带有KeyedState的StatefulMap

.map(newStatefulMapper()).uid(“mapper-id”)

//算子ID

//print是一种无状态的Sink

.print();//Flink为其自动分配一个算子ID对某个作业的状态进行备份,将Savepoint目录保存到某个目录下:从某个Savepoint目录中恢复一个作业:备份和恢复$

./bin/flinksavepoint<jobId>[savepointDirectory]$

./bin/flinkrun-s<savepointPath>[OPTIONS]<xxx.jar>StateProcessorAPI:基于DataSet

API,读写和修改Savepoint数据Savepoint以一定的Schema存储,像读写数据库一样读写SavepointReaderFunction是一个KeyedStateReaderFunction的实现,需要实现open()和readKey()方法:open()方法中注册StateDescriptorreadKey()方法中逐Key读取数据,输出到Collector中从Savepoint中读数据Savepoint中的数据存储形式DataSet<Integer>listState=savepoint.readListState<>("source-id","os1",Types.INT);//ReaderFunction需要继承并实现KeyedStateReaderFunction

DataSet<KeyedState>keyedState=savepoint.readKeyedState("mapper-id",newReaderFunction());向Savepoint中写入状态,适合作业冷启动构建BootstrapTransformation操作,是一个状态写入的过程,可以理解为流处理时使用的有状态的算子withOperator()向Savepoint中添加算子,参数分别为:算子ID一个BootstrapTransformationKeyed

State和Operator

State的BootstrapTransformation实现不同向Savepoint写数据ExecutionEnvironmentbEnv=ExecutionEnvironment.getExecutionEnvironment();//最大并行度

intmaxParallelism=128;StateBackendbackend=...//准备好写入状态的数据

DataSet<Account>accountDataSet=bEnv.fromCollection(accounts);//构建一个BootstrapTransformation,将accountDataSet写入

BootstrapTransformation<Account>transformation=OperatorTransformation.bootstrapWith(accountDataSet).keyBy(acc->acc.id).transform(newAccountBootstrapper());//创建算子,算子ID为accountsSavepoint.create(backend,maxParallelism).withOperator("accounts",transformation).write(savepointPath);bEnv.execute("bootstrap");parquet应用场景Operator

State要实现StateBootstrapFunction实现processElement()方法,每来一个输入,processElement()方法会被调用一次,用于将数据写入Savepoint。Operator

State:StateBootstrapFunction/**

*继承并实现StateBootstrapFunction

*泛型参数为输入类型*/

public

class

SimpleBootstrapFunction

extends

StateBootstrapFunction<Integer>{privateListState<Integer>state;//每个输入都会调用一次processElement,这里将输入加入到状态中

@Overridepublic

void

processElement(Integervalue,Contextctx)

throwsException{state.add(value);}@Overridepublic

void

snapshotState(FunctionSnapshotContextcontext)

throwsException{}//获取状态句柄

@Overridepublic

void

initializeState(FunctionInitializationContextcontext)

throwsException{state=context.getOperatorState().getListState(newListStateDescriptor<>("state",Types.INT));}}KeyedState要实现KeyedStateBootstrapFunction实现processElement()方法,每来一个输入,processElement()方法会被调用一次,用于将数据写入Savepoint。KeyedState:KeyedStateBootstrapFunction/**

*AccountBootstrapper继承并实现了KeyedStateBootstrapFunction

*第一个泛型Integer为Key类型*第二个泛型Account为输入类型*/

public

class

AccountBootstrapper

extends

KeyedStateBootstrapFunction<Integer,Account>{ValueState<Double>state;//获取状态句柄

@Overridepublic

void

open(Configurationparameters)

{ValueStateDescriptor<Double>descriptor=new

ValueStateDescriptor<>("total",Types.DOUBLE);state=getRuntimeContext().getState(descriptor);}//每个输入都会调用一次processElement()@Overridepublic

void

processElement(Accountvalue,Contextctx)

throwsException{state.update(value.amount);}}从已有的Savepoint上修改,保存。removeOperator()将一个算子状态数据从Savepoint中删除。withOperator()方法增加了一个算子。write()方法将数据写入一个路径下。修改SavepointBootstrapTransformation<Integer>transformation=OperatorTransformation.bootstrapWith(data).transform(newModifyProcessFunction());Savepoint.load(bEnv,savepointPath,backend)//删除名为currency的算子

.removeOperator("currency")//增加名为numbers的算子,使用transformation构建其状态数据

.withOperator("number",transformation)//新的Savepoint会写到modifyPath路径下

.write(modifyPath);第六章时间与窗口Flink实时大数据处理技术Table

API

&

SQL与关系型数据库中的查询相似基于数据表Table使用执行计划器(Planner)将关系型查询转换为可执行的Flink作业Blink

Planner和Flink

Planner,Blink

Planner将逐渐取代Flink

PlannerTable

API

&

SQL迭代速度较快,最好参考最新的官方文档创建执行环境(ExecutionEnvironment)和表环境(TableEnvironment)获取表使用TableAPI或SQL在表上做查询等操作将结果输出到外部系统执行作业Table

API

&

SQL骨架程序//基于StreamExecutionEnvironment创建TableEnvironment

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//读取数据源,创建数据表Table

user_behavior//注册输出数据表Table

output_table//使用TableAPI查询user_behavior

TabletabApiResult=tableEnv.from("user_behavior").select(...);//使用SQL查询TablesqlResult=tableEnv.sqlQuery("SELECT...FROMuser_behavior...");//将查询结果输出到outputTable

tabApiResult.insertInto("output_table");sqlResult.insertInto("output_table");Table

API

&

SQL程序主要步骤:注意添加Maven依赖TableEnvironment是整个程序的入口,功能包括:连接外部系统向目录(Catalog)中注册表或者从中获取表执行TableAPI或SQL操作注册用户自定义函数提供一些其他配置功能TableEnvironment

是最顶级的接口StreamTableEnvironment用于流处理,有DataStream和Table之间的转换接口BatchTableEnvironment用于批处理,有DataSet和Table之间的转换接口创建TableEnvironment共5个TableEnvironment,分别面向不同的场景和编程语言用Table来表示广义的表:需要连接外部系统,需要定义Schema,将外部系统数据转化为Table。临时表(TemporaryTable):Flink作业启动后临时创建的表,随着这个Flink作业的结束,临时表也被销毁常驻表(PermanentTable):为整个集群上所有用户和作业提供服务,基于Catalog,作业结束后,Table元数据不会被销毁。Catalog:维护着常驻表的名字、类型(文件、消息队列或数据库)、数据存储位置等元数据数据管理团队在Catalog中创建常驻表,注册好该表的Schema、注明该表使用何种底层技术、写明数据存储位置等;数据分析团队无需关心元数据,无需了解这个表到底是存储在Kafka还是HDFS,直接在这个表上进行查询。获取表调用TableAPI或SQL进行查询可以在Table上使用TableAPI可以在Table上执行SQL语句可以使用TableAPI生成一个表,在此之上进行SQL查询;也可以先进行SQL查询得到一个表,在此之上再调用TableAPI

在表上执行语句StreamTableEnvironmenttEnv=...//创建一个TemporaryTable:user_behavior

TableuserBehaviorTable=tEnv.from("user_behavior");//在Table上使用TableAPI执行关系型操作

TablegroupByUserId=userBehaviorTable.groupBy("user_id").select("user_id,COUNT(behavior)ascnt");//在Table上使用SQL执行关系型操作

TablegroupByUserId=tEnv.sqlQuery("SELECTuser_id,COUNT(behavior)FROMuser_behaviorGROUPBYuser_id");Table

APISQL通过TableSink输出到外部系统与DataStream

Sink相似将表结果输出StreamTableEnvironmenttEnv=...//获取名为CsvSinkTable的Table

//执行查询操作,得到一个名为result的Table

Tableresult=...//将result发送到名为CsvSinkTable的TableSink

result.executeInsert("CsvSinkTable");TableAPI或者SQL经过Planner转化为JobGraph,Planner在中间起到一个转换和优化的作用未经优化的逻辑执行计划(Logical

Plan)、优化器(Optimizer)对Logical

Plan进行优化,得到物理执行计划(Physical

Plan),Physical

Plan最后转换为Flink的JobGraph可以使用Table.explain()来查看语法树、逻辑执行计划和物理执行计划执行作业需要配置外部系统的必要参数、序列化方式、Schema:两种方式:在程序中使用代码编辑配置connect()或将DataStream/DataSet转化为表使用声明式语言,如SQL

DDL或YAMLYAML只能和SQL

Client配合熟悉SQL

DDL的用户多,未来将主要推广SQL

DDL获取表的具体方式流处理上的关系型查询借鉴了物化视图的实现思路批处理关系型查询与流处理

批处理关系型查询流处理输入数据数据是有界的,在有限的数据上进行查询数据流是无界的,在源源不断的数据流上进行查询执行过程一次查询是在一个批次的数据上进行查询,所查询的数据是静态确定的一次查询启动后需要等待数据不断流入,所查询的数据在未来源源不断地到达查询结果一次查询完成后即结束。结果是确定的一次查询会根据新流入数据不断更新结果动态表(DynamicTable)用来表示不断流入的数据表,表中的数据不断更新。在动态表上进行查询,被称为持续查询。一个持续查询的结果也是动态表。动态表上的持续查询电商平台用户行为分析左侧为数据流右侧为转化后的动态表动态表上的持续查询按user_id字段分组,统计每个user_id所产生的行为总数新数据的插入会导致统计结果的更新动态表上的持续查询SQL

1SELECT

user_id,COUNT(behavior)ASbehavior_cntFROMuser_behaviorGROUP

BYuser_id按照user_id字段分组,统计每分钟每个user_id所产生的行为总数数据按照滚动时间窗口来分组动态表上的持续查询SQL

2SELECT

user_id,COUNT(behavior)ASbehavior_cnt,TUMBLE_END(ts,INTERVAL

'1'

MINUTE)ASend_tsFROMuser_behaviorGROUP

BY

user_id,

TUMBLE(ts,INTERVAL

'1'

MINUTE)两种生成结果的方式:SQL

2只追加结果,或者说只在结果表上进行插入操作。SQL

1追加结果的同时,也对结果不断更新,或者说既进行插入操作又进行更新操作或删除操作。动态表上的持续查询两种输出方式:追加(Append-only)模式:在结果末尾追加。更新(Update)模式:既在结果末尾追加,又对已有数据更新。对数据更新又分为两种:先将旧数据撤回,再添加新数据,被称为撤回(Retract)模式直接在旧数据上做更新,被称为插入更新(Upsert)模式动态表的两种输出方式结果共有3列(flag,user_id,behavior_cnt)其中第一列为标志位,表示本行数据是加入还是撤回,后两列是查询结果。Retract模式//将table转换为DataStream

//Retract模式,Boolean为标志位

DataStream<Tuple2<Boolean,Row>>retractStream=tableEnv.toRetractStream(table,Row.class);输出结果需有一个唯一ID,可以根据唯一ID更新结果例如user_id一般不重复,可以被用来作为唯一IDUpsert模式要和特定的TableSink紧密结合Key-Value数据更适合进行Upsert操作Upsert模式Flink通过状态保存中间数据,状态不能无限增加,否则会突破存储限制。空闲状态数据是指该数据长时间没有更新,仍然保留在状态中。清除空闲状态数据:minTime和maxTime:空闲状态至少会保留minTime的时间,这个时间内数据不会被清理;超过maxTime的时间后,空闲状态会被清除。部分状态被清除后,会导致计算结果是近似准确的状态过期时间tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1),Time.hours(2));时间属性使用TIMESTAMP(intprecision)数据类型来表示,对应SQL标准中的时间戳类型

precision为精度,表示秒以下保留几位小数点时间的格式一般为:year-month-dayhour:minute:second[.fractional]绝大多数情况可以使用毫秒精度:TIMESTAMP(3)Flink提供的时间单位:MILLISECOND、SECOND、MINUTE、HOUR、DAY、MONTH和YEAR

时间属性需要在Java/Scala代码中设置使用哪种时间语义三种时间语义StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//默认使用ProcessingTime

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

//使用IngestionTime

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//使用EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);SQL

DDL时间属性列proctime,使用PROCTIME()函数计算得到Processing

Time:时间属性CREATE

TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--在原有Schema基础上添加一列proctime

proctimeasPROCTIME())WITH(

...);将DataStream转化为表时间属性列proctimectime:使用proctime函数,生成proctime列Processing

Time:时间属性DataStream<UserBehavior>userBehaviorDataStream=...//定义了Schema中各字段的名字,其中proctime使用了.proctime属性,这个属性帮我们生成一个ProcessingTime

tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ctime");指定时间属性和Watermark策略SQL:使用WATERMARK关键字,并设置Watermark策略语法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionEvent

Time:时间属性&

WatermarkCREATE

TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--定义ts字段为EventTime时间戳,Watermark比监测到的最晚时间还晚5秒

WATERMARKFORtsasts-INTERVAL

'5'

SECOND

)WITH(

...);

语法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionrowtime_column为时间属性,必须是TIMESTAMP(3)类型watermark_strategy_expression定义了Watermark的生成策略:时间戳严格单调递增WATERMARKFORrowtime_columnASrowtime_columnWATERMARKFORrowtime_columnASrowtime_column-INTERVAL'0.001'SECOND监测所有数据时间戳,并记录时间戳最大值,在最大值基础上添加一个1毫秒的延迟作为Watermark时间时间戳是乱序到达的WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'duration'timeUnittimeUnit可以是SECOND、MINUTE或HOUR等时间单位Event

Time:时间属性&

Watermark由DataStream转换为表在DataStream

API中设置好时间戳和Watermarkts.rowtime:使用rowtime函数,生成ts时间戳列Event

Time:时间属性&

Watermarkenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorDataStream=env.addSource(...)//在DataStream里设置时间戳和Watermark

.assignTimestampsAndWatermarks(...);//创建一个user_behavior表//ts.rowtime表示该列使用EventTimeTimestamp

tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ts.rowtime");基于时间属性窗口分组GROUP

BYOVERWINDOW聚合窗口聚合GROUPBYfield1,time_attr_window:time_attr_window窗口分组函数:例如TUMBLE(proctime,INTERVAL'1'MINUTE)所有含有相同field1+time_attr_window的行都会被分到一组再对这组数据中的其他字段(如field2)进行聚合操作聚合操作:COUNT、SUM、AVG、MAX等将多行数据分到一组,然后对一组数据集进行聚合:多行变一行GROUP

BYTUMBLE(time_attr,interval):滚动窗口窗口是定长的,长度为interval,窗口之间不重叠,滚动向前HOP(time_attr,slide_interval,size_interval)

窗口长度是定长的,长度为size_interval,窗口以slide_interval的速度向前滑动slide_interval

<

size_interval:窗口重叠slide_interval

>

size_interval:窗口之间有间隙SESSION(time_attr,interval):会话窗口窗口长度是变长的,根据interval划分窗口时间间隔格式:INTERVAL‘duration’timeUnitINTERVAL'1'MINUTE窗口分组函数TUMBLE_START(time_attr,interval):当前窗口的起始时间返回值不再是时间属性TUMBLE_END(time_attr,interval)

:当前窗口的结束时间返回值不再是时间属性TUMBLE_ROWTIME(time_attr,interval)

:窗口的结束时间返回值是一个时间属性,后续的查询可以使用这个字段基于Event

TimeTUMBLE_PROCTIME(time-attr,interval)

:窗口的结束时间返回值是一个时间属性,后续的查询可以使用这个字段基于Processing

Time窗口的起始和结束时间TUMBLE_START

/

TUMBLE_END使用方法:TUMBLE(time_attr,interval)中的interval和TUMBLE_START(time_attr,interval)中的interval保持一致,即INTERVAL‘duration’timeUnit中的duration时间长度和timeUnit时间单位,两者保持一致TUMBLE_START

/

TUMBLE_ENDTUMBLE_ROWTIME

/TUMBLE_PROCTIME使用方法可以用在内联视图子查询或Join上案例:先使用TUMBLE_ROWTIME创建一个10秒钟的视图再在视图的基础上进行20分钟的聚合TUMBLE_ROWTIME

/

TUMBLE_PROCTIMESELECTTUMBLE_END(rowtime,INTERVAL'20'MINUTE),user_id,SUM(cnt)

FROM

(SELECTuser_id,COUNT(behavior)AScnt,TUMBLE_ROWTIME(ts,INTERVAL'10'SECOND)ASrowtimeFROMuser_behaviorGROUPBYuser_id,TUMBLE(ts,INTERVAL'10'SECOND)

)GROUPBYTUMBLE(rowtime,INTERVAL'20'MINUTE),user_id每行数据生成窗口,在窗口上进行聚合,聚合的结果会生成一个新字段:一行变一行OVER

WINDOW计算流程:先对field1做分组,包含相同field1的行被分到一起,按照时间属性排序(PARTITION

BY

ORDER

BY

…)每行数据建立一个窗口,窗口起始点为第一行数据,窗口结束点是当前行对窗口内field2字段做各类聚合操作,生成field2_agg的新字段(COUNT、SUM、AVG、MAX等)Flink为每行元素维护一个窗口,为每行元素执行一次窗口计算,完成计算后清除过期数据OVER

WIND

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论