第16周flink快速上手篇it资源附件162学习预览_第1页
第16周flink快速上手篇it资源附件162学习预览_第2页
第16周flink快速上手篇it资源附件162学习预览_第3页
第16周flink快速上手篇it资源附件162学习预览_第4页
第16周flink快速上手篇it资源附件162学习预览_第5页
已阅读5页,还剩24页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1:Flinkermr个特定的时间后,必须触发Window去进行计算了,inorderoutoforder:Watermark的生成方式Watermark的生成方式有两种 WithPeriodic每隔N秒自动向流里面注入一个Watermark,时间间隔由ExecutionConfig.setAutoWatermarkIntervalFlink200ms。之前默100ms WithPunctuatedsocket模拟产生数据,数据的格式为:0001,1790820682000其中1790820682000是数据产生的时间,也就是EventTimeWindow打印信息来验证Window被触发的时机scalapackagepackageimportimportimportimportimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.streaming.api.scala.function.WindowFunctionimportorg.apache.flink.streaming.api.windowing.time.Timeimportimportimportscala.collection.mutable.ArrayBufferimportscala.util.SortingWatermark+EventTimeCreatedbyobjectWatermarkOpScaladefmain(args:Array[String]):Unit=valenv=//1importorg.apache.flink.api.scala._//tuple2valtupStream=text.map(line=>{valarr=line.split(",")(arr(0), uration.ofSeconds(10))//最大允许的数据乱序时间{valsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss")varcurrentMaxTimestamp=0LoverridedefextractTimestamp(element:(String,Long),recordTimestamp:Long):=valtimestamp=currentMaxTimestamp=//计算当前的watermarkvalcurrentWatermark=currentMaxTimestamp-//print}).apply(newWindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow]overridedefapply(key:Tuple,window:TimeWindow,input:Ible[(String,Long)],out:Collector[String]):Unit={valkeyStr=//windowarrBuffvalarrBuff=ArrayBuffer[Long]()//arrBuffarrvalarr//arrvalsdf=newSimpleDateFormat("yyyy-MM-dd }}}[root@bigdata04soft]#nc-l900110:11:22],watermark:[1790820672000|2026-10-0110:11:12]EventEvent2026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:26],watermark:[1790820676000|2026-10-0110:11:16]EventEvent2026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:32],watermark:[1790820682000|2026-10-0110:11:22]2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:33],watermark:[1790820683000|2026-10-0110:11:23]EventEvent2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01来算,最早的数据已经过去了11s了,Window还没有开始计算,那到底什么时候会触发Window呢?[root@bigdata04soft]#nc-l900010:11:34],watermark:[1790820684000|2026-10-0110:11:24](0001),1,2026-10-0110:11:22,2026-10-0110:11:22,2026-10-0110:11:21,2026-10-01Event数据,则当Watermark时间>=EventTime时,就符合了Window触发的条件了,最终决定Window触发,还是由数据本身的EventTime所属Windowwindow_end_time决定。早的一条记录所在Window的window_end_time,所以Window就被触发了。[root@bigdata04soft]#nc-l900010:11:36],watermark:[1790820686000|2026-10-0110:11:26] Event

此时,Watermark时间虽然已经等于第二条数据的时间,但是由于其没有达到第二条数据所WindowWindowWindow时[root@bigdata04soft]#nc-l900010:11:37],watermark:[1790820687000|2026-10-0110:11:27](0001),1,2026-10-0110:11:26,2026-10-0110:11:26,2026-10-0110:11:24,2026-10-01Event1:Watermarkwindow_end_time2:在[inow_strt_tie,indow_nd_tim)区间中有数据存在(注意是左闭右开的区间)。同时满足了以上2个条件,Window才会触发。:+EentTimWatermarkEventTime机制,是如何处理乱序数据的。[root@hadoop100soft]#nc-l10:11:39],watermark:[1790820689000|2026-10-0110:11:29]10:11:39],watermark:[1790820689000|2026-10-0110:11:29]EventEvent10:11:31currentMaxTimestamp1:watermark时间>=window_end_timeWatermark时间(10:11:29)<window_end_time(10:11:33),WindowWindow一定就会触发了,我们试一试,继续输入内容。Event2个数据,10:11:3110:11:3210:11:33的数据,上边的结果,已经表明,对于的数据,Flink可以通过Watermark来实现处理一定范围内的乱序数据。那么对于“(lateelement)”太久的数据,Flink是怎么处理的呢?:LateElement(我们输入一个乱序很多的(EventTimeWatermark时间)数据来测试下:输入2行内容。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01Eventwatermark2026-10-01输入3行内容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]2:allowedLatenessFlink提供了allowedLateness方法可以实现对的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发window执行的。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-01(0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01EventWatermark2026-10-0110:11:33EventTime<Watermark的数据验证一下效果,输入3行内容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),2,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),3,2026-10-0110:11:30,2026-10-0110:11:31,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),4,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-01Event[root@hadoop100soft]#nc-l900010:11:44],watermark:[1790820694000|2026-10-0110:11:34]Event[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),5,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),6,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),7,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-011Watermark10:11:35。[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]此时,Watermark10:11:35我们再输入几条EventTime<Watermark3[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]□当Watemark等于10:11:33window_end_time所以会触发Window当窗口执行过后,我们再输入[10:11:30~10:11:33)WindowWindow是Watemark10:11:34的时候,我们输入[10:11:30~10:11:33)Window内的数据会发现Window也是可以被触发的。Watemark10:11:35的时候,我们输入[10:11:30~10:11:33)Window内的数据会发现Window不会被触发了。由于我们面设置了allowedLateness(Time.seconds(2)),因此可以允许延迟在2s内的数据继续触发Window执行。所以当Watermark是10:11:34的时候可以触发Window10:11:35的时候就。□时第二次(或多次)Watermark<window_end_time+allowedLateness时间内,这个窗口有Late数据到达时。Watermark10:11:34EventTime10:11:30、10:11:31、10:11:32window_end_time都是10:11:33,也就是10:11:34<10:11:33+2true。但是当Watermark等于10:11:35的时候,我们再输入EventTime为10:11:3010:11:3110:11:32window_end_time10:11:33,此时,10:11:35<10:11:33+2为false了,所以最终这些数据的时间太久了,就不会再触发Window的执行操作了。3:sideOutputLateData收集的数[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01此时,WindowWatermark10:11:33[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]此时,针对这几条的数据,都通过sideOutputLateData保存到了outputTag中[root@hadoop100soft]#nc-l900010:11:22],watermark:/p>

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论