Flink实时大数据处理技术 课件第6、7章 时间和窗口、处理函数与状态管理_第1页
Flink实时大数据处理技术 课件第6、7章 时间和窗口、处理函数与状态管理_第2页
Flink实时大数据处理技术 课件第6、7章 时间和窗口、处理函数与状态管理_第3页
Flink实时大数据处理技术 课件第6、7章 时间和窗口、处理函数与状态管理_第4页
Flink实时大数据处理技术 课件第6、7章 时间和窗口、处理函数与状态管理_第5页
已阅读5页,还剩80页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第六章时间与窗口Flink实时大数据处理技术三种时间语义事件到达Flink的时间可能是乱序的Event

Time:事件发生的时间无需担心乱序到达问题Watermark假设不会有更晚的数据需要用缓存存储中间数据,增大了延迟Processing

Time:当前节点的系统时钟时间计算结果有不确定性不需要设置Watermark延迟较低Ingestion

Time事件到达Flink

Source的时间不需要设置Watermark延迟较低在执行环境层面设置使用哪种时间语义TimeCharacteristic.EventTimeTimeCharacteristic.ProcessingTimeTimeCharacteristic.IngestionTimefromElements()

或fromCollection()

创建的DataStream应该使用Event

Time,并对数据流中的每个元素的Event

Time赋值使用带时序性的Source:Socket、Kafka等或者不带时序性的Source,使用Event

Time,并生成Watermark:文件等设置时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Event

Time时间语义使用时间戳为数据流中的每个事件的Event

Time赋值生成WatermarkUnix时间戳系统、毫秒精度Watermark是插入到数据流中的一种特殊数据结构,它包含一个时间戳,并假设后续不会有小于该时间戳的数据Watermark时间戳必须单调递增Watermark与事件时间越紧凑,越容易产生延迟数据;越宽松,等待时间越长、延迟越高Event

Time和WatermarkWatermark需要在并行环境下向前传播每个算子子任务维护一个针对该子任务的Event

Time时钟,时钟记录了这个算子子任务的处理速度上游算子的Watermark数据不断向下发送,算子子任务的Event

Time时钟也要不断向前更新每个算子子任务也要维护来自上游多个分区的Watermark信息:

PartitionWatermark

分布式环境下Watermark的传播Flink先判断新流入的Watermark时间戳是否大于PartitionWatermark列表内该分区的历史Watermark时间戳,大于则更新该Partition

Watermark时间戳遍历Partition

Watermark列表,选择最小的作为该算子子任务Event

Time时钟,如果更新了Event

Time时钟,则将更新的Event

Time作为Watermark发送给下游所有算子子任务分布式环境下Watermark的传播抽取时间戳和生成Watermark两者紧密结合只在Event

Time语义下有效时间越早设置越好设置方法:在Source中设置在Source之后设置抽取时间戳及生成Watermark老的Source接口:实现SourceFunction或RichSourceFunction抽取时间戳:collectWithTimestamp()生成Watermark:emitWatermark()SourceclassMySourceextendsRichSourceFunction[MyType]{@Overridepublicvoidrun(SourceContext<MyType>ctx)throwsException{while(/*condition*/){MyTypenext=getNext();ctx.collectWithTimestamp(next,next.eventTime);if(next.hasWatermarkTime()){

ctx.emitWatermark(newWatermark(next.watermarkTime));}}}}assignTimestampsAndWatermarks()方法:Flink

1.11之后进行了重构WatermarkStrategy抽取时间戳:withTimestampAssigner().withTimestampAssigner((event,timestamp)->event.eventTime)生成Watermark:forGenerator()实现自己的Watermark策略周期性地Periodic逐个式地PunctuatedSource之后DataStream<MyType>stream=...DataStream<MyType>withTimestampsAndWatermarks=stream.assignTimestampsAndWatermarks( WatermarkStrategy

.forGenerator(...)

.withTimestampAssigner(...));周期可以设置默认每200毫秒生成一次env.getConfig.setAutoWatermarkInterval(5000L)实现WatermarkGeneratoronEvent():数据流中每个元素到达后调用该方法onPeriodicEmit():定期发射WatermarkMyPeriodicGeneratorcurrentMaxTimestamp记录已抽取的时间戳最大值时间戳最大值减1分钟作为Watermark发送出去周期性地生成Watermark//定期生成Watermark

//数据流元素Tuple2<String,Long>共两个字段

//第一个字段为数据本身

//第二个字段是时间戳

public

static

class

MyPeriodicGenerator

implements

WatermarkGenerator<Tuple2<String,Long>>{private

final

longmaxOutOfOrderness=60*1000;//1分钟

private

longcurrentMaxTimestamp;//已抽取的Timestamp最大值

@Overridepublic

void

onEvent(Tuple2<String,Long>event,longeventTimestamp,WatermarkOutputoutput)

{//更新currentMaxTimestamp为当前遇到的最大值

currentMaxTimestamp=Math.max(currentMaxTimestamp,eventTimestamp);}@Overridepublic

void

onPeriodicEmit(WatermarkOutputoutput)

{//Watermark比currentMaxTimestamp最大值慢1分钟

output.emitWatermark(newWatermark(currentMaxTimestamp-maxOutOfOrderness));}}用forGenerator()方法调用MyPeriodicGenerator类基于时间戳最大值的场景比较普遍,Flink做了进一步封装BoundedOutOfOrdernessWatermarksforBoundedOutOfOrderness()方法AscendingTimestampsWatermarksEvent

Time时间戳单调递增forMonotonousTimestamps()方法周期性地生成Watermark//第二个字段是时间戳

DataStream<Tuple2<String,Long>>watermark=input.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context->newMyPeriodicGenerator())).withTimestampAssigner((event,recordTimestamp)->event.f1));//第二个字段是时间戳

DataStream<Tuple2<String,Long>>input=env.addSource(newMySource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event,timestamp)->event.f1));使用自己实现的MyPeriodicGenerator使用Flink封装的forBoundedOutOfOrderness数据流元素有特殊标记,标记哪些元素为Watermark根据元素是否有特殊标记,判断是否生成Watermark每个元素都生成一个Watermark,增大下游计算的延迟,拖累整个Flink作业的性能。逐个式地生成Watermark//逐个检查数据流中的元素,根据元素中的特殊字段,判断是否要生成Watermark

//数据流元素Tuple3<String,Long,Boolean>共三个字段

//第一个字段为数据本身

//第二个字段是时间戳

//第三个字段判断是否为Watermark的标记

public

static

class

MyPunctuatedGenerator

implements

WatermarkGenerator<Tuple3<String,Long,Boolean>>{@Overridepublic

void

onEvent(Tuple3<String,Long,Boolean>event,longeventTimestamp,WatermarkOutputoutput)

{if(event.f2){output.emitWatermark(newWatermark(event.f1));}}@Overridepublic

void

onPeriodicEmit(WatermarkOutputoutput)

{//这里不需要做任何事情,因为我们在onEvent()方法中生成了Watermark

}}Watermark策略:在延迟和准确性之间平衡Watermark策略没有标准答案我们无法预知流处理有多少迟到数据Watermark与事件时间戳贴合较紧,一些数据会被当成迟到数据,影响计算结果的准确性Watarmark设置得较松,更多数据会先缓存起来以等待计算,整个应用的延迟增加;增大了内存的压力延迟与准确性Flink体系中最底层的API提供了对数据流更细粒度的操作权限访问和更新状态获取时间戳、使用定时器(Timer)主要包括:KeyedProcessFunction、ProcessFunction、CoProcessFunction等ProcessFunction系列函数Timer就像一个闹钟先在Timer中注册一个未来的时间当这个时间到达,“闹钟”响起,程序执行回调函数,回调函数执行一定的业务逻辑ProcessFunction两大重要接口:processElement()方法处理数据流中的一条元素,并通过Collector<O>输出出来。Context是processElement()方法的特色,可以获取时间戳、访问TimerService,设置TimeronTimer()是回调函数,当到了“闹钟”时间,Flink会调用onTimer()方法,执行一些业务逻辑Timer的使用方法//处理数据流中的一条元素

public

abstract

void

processElement(Ivalue,Contextctx,Collector<O>out)//时间到达后的回调函数

public

void

onTimer(longtimestamp,OnTimerContextctx,Collector<O>out)Timer的使用方法在processElement()方法中通过Context注册一个未来的时间戳t在onTimer()方法中实现一些逻辑,到达t时刻,onTimer()方法被自动调用。只能在KeyedStream上注册Timer每个Key下可以注册多个不同时间戳作为Timer每个Key下某个时间戳下只能注册一个Timer未来的时间戳t可以是Processing

Time也可以是Event

Time从Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口Context.timerService.registerProcessingTimeTimer()注册一个Processing

Time的TimerContext.timerService.deleteProcessingTimeTimer()删除之前注册的TimerContext.timerService.currentProcessingTime()获取当前时间戳某支股票未来某段interval时间间隔是否一致连续上涨如果未来interval间隔内一直上涨,发送一个提示解决思路:如果新数据比上次数据价格更高且没有注册Timer,注册一个未来interval之后的Timer在interval期间内,如果价格回落,则把刚才的Timer删掉在interval期间内,如果价格一直上升,触发onTimer()onTimer()发送提示Timer案例:股票交易场景将一部分数据发送到另一个流中两个流数据类型可以不一样通过OutputTag<T>标记另外一个数据流将交易量大于100的数据流侧输出侧输出OutputTag<StockPrice>highVolumeOutput=

newOutputTag<StockPrice>("high-volume-trade"){};

public

static

class

SideOutputFunction

extends

KeyedProcessFunction<String,StockPrice,String>{@Overridepublic

void

processElement(StockPricestock,Contextcontext,Collector<String>out)

throwsException{if(stock.volume>100){

context.output(highVolumeOutput,stock);}else{

out.collect("normaltickdata");}}}DataStream<StockPrice>inputStream=...SingleOutputStreamOperator<String>mainStream=inputStream.keyBy(stock->stock.symbol)//调用process函数,包含侧输出逻辑

.process(newSideOutputFunction());DataStream<StockPrice>sideOutputStream=mainStream.getSideOutput(highVolumeOutput);CoProcessFunction或KeyedCoProcessFunctionprocessElement1()方法:对第一个数据流的每个元素处理processElement2()方法:对第二个数据流的每个元素处理第一个流、第二个流可以共享状态第一个流、第二个流、输出流三者的数据类型可以不一样案例:实现两个数据流上的Join:创建状态,两个流都可以访问状态,例如状态变量aprocessElement1()方法处理第一个数据流,更新状态a。processElement2()方法处理第二个数据流,根据状态a中的数据,生成相应的输出。两个流上使用ProcessFunction股票流包含价格、交易量、时间戳等媒体评价流包含了对各支股票的正负评价两支数据流一起流入KeyedCoProcessFunction主逻辑中先将两个数据流connect(),然后按照股票代号进行keyBy(),进而使用process():案例:股票价格流与媒体评价流做Join/**

*四个泛型:Key,第一个流类型,第二个流类型,输出。*/

publicstaticclass

JoinStockMediaProcessFunction

extends

KeyedCoProcessFunction<String,StockPrice,Media,StockPrice>{//mediaState

privateValueState<String>mediaState;@Overridepublicvoidopen(Configurationparameters)throwsException{//从RuntimeContext中获取状态

mediaState=getRuntimeContext().getState(newValueStateDescriptor<String>("mediaStatusState",Types.STRING));}@OverridepublicvoidprocessElement1(StockPricestock,Contextcontext,Collector<StockPrice>collector)throwsException{StringmediaStatus=mediaState.value();if(null!=mediaStatus){stock.mediaStatus=mediaStatus;collector.collect(stock);}}@OverridepublicvoidprocessElement2(Mediamedia,Contextcontext,Collector<StockPrice>collector)throwsException{//第二个流更新mediaState

mediaState.update(media.status);}}//读入股票数据流

DataStream<StockPrice>stockStream=...//读入媒体评价数据流

DataStream<Media>mediaStream=...DataStream<StockPrice>joinStream=stockStream.connect(mediaStream).keyBy("symbol","symbol")//调用process函数

.process(newJoinStockMediaProcessFunction());根据是否keyBy()分为Keyed

Window和Non-Keyed

WindowkeyBy()windowAll()下游算子并行度为1窗口程序两个必须操作使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(WindowFunction)进行处理,常用的WindowFunction有reduce()、aggregate()、process()其他的trigger()、evictor()则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。窗口程序的骨架结构//KeyedWindow

stream.keyBy(<KeySelector>)

//按照一个Key进行分组

.window(<WindowAssigner>)

//将数据流中的元素分配到相应的窗口中

[.trigger(<Trigger>)]//指定触发器Trigger(可选)

[.evictor(<Evictor>)]//指定清除器Evictor(可选)

.reduce/aggregate/process()//窗口处理函数WindowFunction

//Non-KeyedWindow

stream.windowAll(WindowAssigner)//不分组,将数据流中的所有元素分配到相应的窗口中

[.trigger(<Trigger>)]//指定触发器Trigger(可选)

[.evictor(<Evictor>)]//指定清除器Evictor(可选)

.reduce/aggregate/process()//窗口处理函数WindowFunction窗口分配器WindowAssigner将元素分配给不同的时间窗口时间窗口上进行Window

Function计算案例:设置10分钟的时间窗口确定窗口的长度0:00

0:10、0:10

0:20数据流元素流入,根据元素的时间,分配到不同的窗口当窗口满足了触发条件,触发相应的Window

Function计算窗口的生命周期窗口之间不重叠,窗口长度(Size)是固定的可以设置偏移量Offset内置的窗口划分方法–滚动窗口DataStream<T>input=...//基于Event

Time的时间窗口

input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowfunction>(...)//在小时级滚动窗口上设置15分钟的Offset偏移

input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))).<windowfunction>(...)以一个步长(Slide)不断向前滑动,窗口的长度(Size)固定Slide小于窗口的Size时,相邻窗口会重叠,一个元素会被分配到多个窗口Slide大于Size,有些元素可能被丢掉内置的窗口划分方法–滑动窗口DataStream<T>input=...//基于EventTime的滑动窗口

input.keyBy(<KeySelector>).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).<windowfunction>(...)两个窗口之间有一个间隙,被称为SessionGap当一个窗口在大于SessionGap的时间内没有接收到新数据时,窗口将关闭窗口的长度可变、窗口的开始和结束时间不确定可以设置定长的SessionGap,也可以使用SessionWindowTimeGapExtractor动态地确定SessionGap的长度内置的窗口划分方法–会话窗口DataStream<T>input=...//基于EventTime定长SessionGap的会话窗口

input.keyBy(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowfunction>(...)//基于EventTime变长SessionGap的会话窗口

input.keyBy(<KeySelector>).window(EventTimeSessionWindows .withDynamicGap((element)->{//返回SessionGap的长度

})).<windowfunction>(...)数据流元素经过WindowAssigner后,被分配给不同的窗口使用窗口函数,在每个窗口上对窗口内的元素进行处理窗口函数分为两类:增量计算,如reduce()和aggregate()全量计算,如process()增量计算:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。全量计算:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。窗口函数ReduceFunction使用reduce()需要实现ReduceFunctionReduceFunction接受两个相同类型的输入,生成一个输出。两两合一地进行汇总操作,生成一个同类型的新元素需要维护一个状态数据,状态数据的数据类型和输入、输出的数据类型是一致的优点:状态数据小,ReduceFunction好实现缺点:功能受限//读入股票数据流

DataStream<StockPrice>stockStream=...senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//reduce的返回类型必须和输入类型StockPrice一致

DataStream<StockPrice>sum=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).reduce((s1,s2)->StockPrice.of(s1.symbol,s2.price,s2.ts,s1.volume+s2.volume));AggregateFunction使用aggregate()需要实现AggregateFunction实现起来稍复杂:输入类型IN、输出类型OUT、中间状态数据ACC三者不相同,可以自定义ACC的数据结构需要实现多个虚方法:createAccumulator()、add()、getResult()public

interface

AggregateFunction<IN,ACC,OUT>extends

Function,Serializable{//在一次新的aggregate发起时,创建一个新的Accumulator,Accumulator是我们所说的中间状态数据,简称ACC

//这个函数一般在初始化时调用

ACCcreateAccumulator();//当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACC

ACCadd(INvalue,ACCaccumulator);//将两个ACC合并

ACCmerge(ACCa,ACCb);//将中间数据转成结果数据

OUTgetResult(ACCaccumulator);}AggregateFunction源码AggregateFunction计算一个窗口内某支股票的平均值ACC中要保存总和(sum)、个数(count)以及股票代号(symbol)createAccumulator():创建新的ACC,初始化ACC数据add():新数据到达,更新ACC中的sum和countgetResult():将ACC转换为最终结果merge():窗口融合时,多个窗口里的ACC合并,生成新的ACCAggregateFunction计算流程AggregateFunction/**

*接收三个泛型:*IN:StockPrice

*ACC:(String,Double,Int)-(symbol,sum,count)

*OUT:(String,Double)-(symbol,average)

*/

publicstaticclass

AverageAggregate

implements

AggregateFunction<StockPrice,Tuple3<String,Double,Integer>,Tuple2<String,Double>>{@OverridepublicTuple3<String,Double,Integer>createAccumulator(){returnTuple3.of("",0d,0);}@OverridepublicTuple3<String,Double,Integer>add(StockPriceitem,Tuple3<String,Double,Integer>accumulator){doubleprice=accumulator.f1+item.price;intcount=accumulator.f2+1;returnTuple3.of(item.symbol,price,count);}@OverridepublicTuple2<String,Double>getResult(Tuple3<String,Double,Integer>accumulator){returnTuple2.of(accumulator.f0,accumulator.f1/accumulator.f2);}@OverridepublicTuple3<String,Double,Integer>merge(Tuple3<String,Double,Integer>a,Tuple3<String,Double,Integer>b){returnTuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}DataStream<StockPrice>stockStream=...DataStream<Tuple2<String,Double>>average=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).aggregate(newAverageAggregate());主程序实现一个AggregateFunctionProcessWindowFunctionProcessWindowFunction要对窗口内的全量数据都缓存Flink将某个Key下某个窗口的所有元素都缓存在Iterable<IN>中,对其进行处理后,用Collector<OUT>收集输出Context获取窗口内更多的信息,包括时间、状态、迟到数据发送位置等/**

*函数接收四个泛型*IN输入类型*OUT输出类型*KEYkeyBy中按照Key分组,Key的类型*W窗口的类型*/

public

abstract

class

ProcessWindowFunction<IN,OUT,KEY,W

extends

Window>extends

AbstractRichFunction{/**

*对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到Collector<OUT>中*我们可以输出一到多个结果*/

public

abstract

void

process(KEYkey,Contextcontext,Iterable<IN>elements,Collector<OUT>out)

throwsException;/**

*当窗口执行完毕被清理时,删除各类状态数据。*/

public

void

clear(Contextcontext)

throwsException{}…}ProcessWindowFunction源码ProcessWindowFunction与增量计算相结合想访问窗口中Context等元数据,又想使用增量计算,不缓存所有数据,可以将ProcessWindowFunction与增量计算函数reduce()或aggregate()相结合Flink先进行增量计算,窗口结束前,将增量计算结果发送给ProcessWindowFunction再处理案例:计算时间窗口下股票的最大值、最小值以及窗口结束时间戳窗口的最大值、最小值可以由reduce()增量计算得到窗口结束时间戳需要从Context中获得触发器Trigger每个窗口都有一个Trigger,Trigger决定了窗口何时启动Window

Function执行计算、何时清理窗口中的数据例如:Processing

Time下的时间窗口带有一个默认的Trigger,当到达这个窗口的结束时间,触发相应的计算其他窗口触发的特例:窗口中遇到某些特定的元素、元素总数达到一定数量或窗口中元素按照某个特定模式顺序到达针对这些特例,可以自定义Trigger触发器TriggerTrigger返回结果:CONTINUE:什么都不做。FIRE:启动计算并将结果发送给下游,不清理窗口数据。PURGE:清理窗口数据但不执行计算。FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据。Trigger本质上是一种定时器Timer,注册一个合适的时间,到达这个时间,根据业务逻辑决定发送上面四个结果中的一个。清除器EvictorEvictor用来清除数据增量计算没必要使用EvictorevictBefore()和evictAfter()分别在WindowFunction之前和之后被调用,方法里可以自定义一些业务逻辑,清除窗口中的数据双流关联将两个数据流的数据关联(Join)流处理的Join是在时间窗口上进行两个流的Join目前,Flink支持两种:窗口连接Window

Join时间间隔连接Interval

Join窗口连接Window

Join同一个窗口上两个流按照某个Key进行Join窗口划分可以使用滚动窗口、滑动窗口和会话窗口一个窗口包含来自两个数据流中的元素,两个流之间以Inner

Join语义关联,形成数据对窗口结束时间,Flink使用JoinFunction来对窗口中的数据进行处理input1.join(input2)

.where(<KeySelector>) <-input1使用哪个字段作为Key.equalTo(<KeySelector>) <-input2使用哪个字段作为Key.window(<WindowAssigner>) <-指定WindowAssigner[.trigger(<Trigger>)] <-指定Trigger(可选)[.evictor(<Evictor>)] <-指定Evictor(可选).apply(<JoinFunction>) <-指定JoinFunction窗口连接Window

Joinpublic

static

class

MyJoinFunction

implements

JoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>{@OverridepublicStringjoin(Tuple2<String,Integer>input1,Tuple2<String,Integer>input2)

{return

"input1:"+input1.f1+",input2:"+input2.f1;}}DataStream<Tuple2<String,Integer>>input1=...DataStream<Tuple2<String,Integer>>input2=...DataStream<String>joinResult=input1.join(input2).where(i1->i1.f0).equalTo(i2->i2.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).apply(newMyJoinFunction());自定义JoinFunction,打印两个流Join结果时间间隔连接Interval

Join使用时间间隔(Interval)确定窗口提供上界和下界:对于input1的元素element1,input2中符合以下公式的元素elementX会与input1的元素element1组成数据对

迟到数据Event

Time语义下,使用Watermark判断数据是否迟到迟到元素:元素到达窗口算子时,该元素本该分配给某个窗口,由于延迟,窗口已经触发计算处理迟到数据的三种方式:直接将迟到数据丢弃将迟到数据发送到另一个流重新执行一次计算,将迟到数据考虑进来,更新计算结果侧输出:将迟到数据发送到某个特定的流上后续根据业务逻辑要求,对迟到数据进行处理将迟到数据发送到另一个流finalOutputTag<T>lateOutputTag=newOutputTag<T>("late-data"){};DataStream<T>input=...SingleOutputStreamOperator<T>result=input.keyBy(<keyselector>).window(<windowassigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowedtransformation>(<windowfunction>);DataStream<T>lateStream=result.getSideOutput(lateOutputTag);将迟到内容写到名为“late-data”的OutputTag下,之后用getSideOutput()获取这些迟到数据allowedLateness(lateness)允许用户先得到一个结果,如果lateness时间内有迟到数据,迟到数据会和之前的数据一起被重新计算开启这个功能后,窗口计算已经触发,窗口中原来的状态仍需要被保留,迟到数据与原来的状态数据融合,才能更新结果更新的结果可能多次向下游发送或者将原来的结果覆盖或者多份数据同时保存,每份数据有自己的时间戳更新计算结果DataStream<Tuple4<String,String,Integer,String>>allowedLatenessStream=input.keyBy(item->item.f0).timeWindow(Time.seconds(5)).allowedLateness(Time.seconds(5)).process(newAllowedLatenessFunction());第六章时间与窗口Flink实时大数据处理技术什么是有状态的计算有状态计算的潜在场景数据去重:需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入数据去重检查输入流是否符合某个特定模式:之前流入的数据以状态的形式缓存下来对一个窗口内的数据进行聚合分析,比如分析一小时内某项指标75分位值或99分位值Flink分布式计算,一个算子有多个算子子任务状态可以被理解为某个算子子任务在当前实例上的一个变量,变量记录了数据流的历史信息,新数据流入,可以结合历史信息来进行计算接收输入流/获取对应状态/更新状态状态管理的难点要解决问题:实时性,延迟不能太高数据不丢不重、恰好计算一次,尤其发生故障恢复后程序的可靠性要高,保证7*24小时稳定运行难点不能将状态直接交由内存,因为内存空间有限用持久化的系统备份状态,出现故障时,如何从备份中恢复需要考虑扩展到多个节点时的伸缩性Flink解决了上述问题,提供有状态的计算APIManaged

State和Raw

State托管状态(ManagedState)是由Flink管理的,Flink帮忙存储、恢复和优化原生状态(RawState)是开发者自己管理的,需要自己序列化Managed

State又细分为Keyed

State和Operator

StateFlink的几种状态类型

ManagedStateRawState状态管理方式FlinkRuntime托管,自动存储、自动恢复、自动伸缩用户自己管理状态数据结构Flink提供的常用数据结构,如ListState、MapState等字节数组:byte[]使用场景绝大多数Flink函数用户自定义函数Keyed

State是KeyedStream上的状态,每个Key共享一个状态OperatorState每个算子子任务共享一个状态Keyed

State和Operator

StateKeyed

State相同Key的数据可以访问、更新这个状态Operator

State流入这个算子子任务的所有数据可以访问、更新这个状态Keyed

State和Operator

State都是基于本地的,每个算子子任务维护着自身的状态,不能访问其他算子子任务的状态具体的实现层面,Keyed

State需要重写Rich

Function函数类,Operator

State需要实现CheckpointedFunction等接口Keyed

State和Operator

State

KeyedStateOperatorState适用算子类型只适用于KeyedStream上的算子可以用于所有算子状态分配每个Key对应一个状态一个算子子任务对应一个状态创建和访问方式重写RichFunction,通过里面的RuntimeContext访问实现CheckpointedFunction等接口横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配的方式支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等修改Flink应用的并行度:每个算子的并行算子子任务数发生了变化,整个应用需要关停和启动一些算子子任务某份在原来某个算子子任务的状态需要平滑更新到新的算子子任务上Flink的Checkpoint可以辅助状态数据在算子子任务之间迁移算子子任务生成快照(Snapshot)保存到分布式存储上子任务重启后,相应的状态在分布式存储上重建(Restore)Keyed

State与Operator

State的横向扩展方式稍有不同横向扩展问题Flink提供了封装好的数据结构供我们使用,包括ValueState、ListState等主要有:ValueState:单值MapState:Key-Value对ListState:列表ReducingState和AggregatingState:合并Keyed

State由于跟Key绑定,Key自动分布到不同算子子任务,Keyed

State也可以根据Key分发到不同算子子任务上Keyed

State实现RichFunction函数类,比如RichFlatMapFunction创建StateDescriptor,StateDescriptor描述状态的名字和状态的数据结构,每种类型的状态有对应的StateDescriptor通过StateDescriptor,从RuntimeContext中获取状态调用状态提供的方法,获取状态,更新数据Keyed

State//创建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通过StateDescriptor获取运行时上下文中的状态

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);MapState<UK,UV>:UVget(UKkey)voidput(UKkey,UVvalue)booleancontains(UKkey)…案例:统计电商用户行为UserBehavior场景下,某个用户(userId)下某种用户行为(behavior)的数量Keyed

State/**

*MapStateFunction继承并实现RichFlatMapFunction*两个泛型分别为输入数据类型和输出数据类型*/

publicstaticclass

MapStateFunction

extends

RichFlatMapFunction<UserBehavior,Tuple3<Long,String,Integer>>{//指向MapState的句柄

privateMapState<String,Integer>behaviorMapState;@Overridepublicvoidopen(Configurationconfiguration){//创建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通过StateDescriptor获取运行时上下文中的状态

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);}@OverridepublicvoidflatMap(UserBehaviorinput,Collector<Tuple3<Long,String,Integer>>out)throwsException{intbehaviorCnt=1;//behavior有可能为pv、cart、fav、buy等

//判断状态中是否有该behavior

if(behaviorMapState.contains(input.behavior)){behaviorCnt=behaviorMapState.get(input.behavior)+1;}//更新状态

behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt));}}使用MapState记录某个behavior下的数量<behavior,

behaviorCnt>UserBehavior案例先基于userId进行keyBy再使用有状态的MapStateFunction进行处理Keyed

Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorStream=...//生成一个KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上进行flatMap

DataStream<Tuple3<Long,String,Integer>>behaviorCountStream=keyedStream.flatMap(newMapStateFunction());状态:算子子任务的本地数据在Checkpoint过程时写入存储,这个过程被称为备份(Snapshot)初始化或重启一个Flink作业时,以一定逻辑从存储中读出并变为算子子任务的本地数据,这个过程被称为重建(Restore)Keyed

State开箱即用:数据划分基于Key,Snapshot和Restore过程可以基于Key在多个算子子任务之间做数据迁移Operator

State每个算子子任务管理自己的状态,流入到这个算子子任务上的所有数据可以访问和修改Operator

State故障重启后,数据流中某个元素不一定流入重启前的算子子任务上需要根据具体业务场景设计Snapshot和Restore的逻辑使用CheckpointedFunction接口类Operator

StateFlink定期执行Checkpoint,会将状态数据Snapshot到存储上每次执行Snapshot,会调用snapshotState()方法,因此我们要实现一些Snapshot逻辑,比如将哪些状态持久化initializeState()在算子子任务初始化状态时调用,有两种被调用的可能:整个Flink作业第一次执行,状态数据需要初始化一个默认值Flink作业遇到故障重启,基于之前已经持久化的状态恢复ListState

/

UnionListStateBroadcastStateOperator

Statepublic

interface

CheckpointedFunction{//Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化

void

snapshotState(FunctionSnapshotContextcontext)

throwsException;//初始化时会调用这个方法,向本地状态中填充数据

void

initializeState(FunctionInitializationContextcontext)

throwsException;}CheckpointedFunction源码状态以列表的形式序列化并存储单个状态为S,每个算子子任务有零到多个状态,共同组成一个列表ListState[S],Snapshot时将这些状态以列表形式写入存储包含所有状态的大列表,当作业重启时,将这个大列表重新分布到各个算子子任务上ListState:将大列表按照Round-Ribon模式均匀分布到各个算子子任务上,每个算子子任务得到的是大列表的子集UnionListState:将大列表广播给所有算子子任务应用场景:Source上保存流入数据的偏移量,Sink上对输出数据做缓存Operator

State

ListState、UnionListStateOperator

State使用方法重点实现snapshotState()和initializeState()两个方法在initializeState()方法里初始化并获取状态注册StateDescriptor,指定状态名字和数据类型从FunctionInitializationContext中获取OperatorStateStore,进而获取Operator

State在snapshotState()方法里实现一些业务逻辑基于ListState实现可缓存的Sink//重写CheckpointedFunction中的snapshotState()

//将本地缓存Snapshot到存储上

@OverridepublicvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{//将之前的Checkpoint清理

checkpointedState.clear();for(Tuple2<String,Integer>element:bufferedElements){//将最新的数据写到状态中

checkpointedState.add(element);}}//重写CheckpointedFunction中的initializeState()

//初始化状态

@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{//注册ListStateDescriptor

ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));//从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState

checkpointedState=context.getOperatorStateStore().getListState(descriptor);//如果是作业重启,读取存储中的状态数据并填充到本地缓存中

if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}Sink先将数据放在本地缓存中,并定期通过snapshotState()方法进行SnapshotinitializeState()初始化状态,需判断是新作业还是重启作业snapshotState()initializeState()Broadcast可以将部分数据同步到所有实例上使用场景:一个主数据流,一个控制规则流,主数据流比较大,只能分散在多个算子实例上,控制规则流数据比较小,可以广播分发到所有算子实例上。与Join的区别:控制规则流较小,可以放到每个算子实例里电商用户行为分析案例:识别用户行为模式,行为模式包括“反复犹豫下单类”、“频繁爬取数据类”等,控制流里包含了这些行为模式,使用Flink实时识别用户Broadcast

State主逻辑中读取两个数据流Broadcast

State支持Key-Value形式,需要使用MapStateDescriptor来构建再使用broadcast()方法将数据广播到所有算子子任务上,得到BroadcastStream主数据流先进行keyBy(),然后与广播流合并,在KeyedBroadcastProcessFunction中实现具体业务逻辑BroadcastPatternFunction是KeyedBroadcastProcessFunction的具体实现Broadcast

State//主数据流

DataStream<UserBehavior>userBehaviorStream=...//BehaviorPattern数据流

DataStream<BehaviorPattern>patternStream=...//BroadcastState只能使用Key->Value结构,基于MapStateDescriptor

MapStateDescriptor<Void,BehaviorPattern>broadcastStateDescriptor=newMapStateDescriptor<>("behaviorPattern",Types.VOID,Types.POJO(BehaviorPattern.class));BroadcastStream<BehaviorPattern>broadcastStream=patternStream.broadcast(broadcastStateDescriptor);//生成一个KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上进行connect()和process()

DataStream<Tuple2<Long,BehaviorPattern>>matchedStream=keyedStream.connect(broadcastStream).process(newBroadcastPatternFunction());processElement()方法处理主数据流中的每条元素,输出零到多个数据processBroadcastElement()方法处理广播流,可以输出零到多个数据,一般用来更新BroadcastStateKeyedBroadcastProcessFunction属于ProcessFunction系列函数,可以注册Timer,并在onTimer方法中实现回调逻辑。KeyedBroadcastProcessFunctionFlink的状态是基于本地的,本地状态数据不可靠Checkpoint机制:Flink定期将状态数据保存到存储上,故障发生后将状态数据恢复。快照(Snapshot)、分布式快照(DistributedSnapshot)和检查点(Checkpoint)均指的是Flink将状态写入存储的过程一个简单的Checkpoint流程:暂停处理新流入数据,将新数据缓存下来将算子子任务的本地状态数据拷贝到一个远程的持久化存储上继续处理新流入的数据,包括刚才缓存起来的数据Checkpoint机制检查点分界线(CheckpointBarrier)被插入到数据流中,将数据流切分成段。Flink的算子接收到CheckpointBarrier后

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论