版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1:Flinkermr个特定的时间后,必须触发Window去进行计算了,inorderoutoforder:Watermark的生成方式Watermark的生成方式有两种 WithPeriodic每隔N秒自动向流里面注入一个Watermark,时间间隔由ExecutionConfig.setAutoWatermarkIntervalFlink200ms。之前默100ms WithPunctuatedsocket模拟产生数据,数据的格式为:0001,1790820682000其中1790820682000是数据产生的时间,也就是EventTimeWindow打印信息来验证Window被触发的时机scalapackagepackageimportimportimportimportimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.streaming.api.scala.function.WindowFunctionimportorg.apache.flink.streaming.api.windowing.time.Timeimportimportimportscala.collection.mutable.ArrayBufferimportscala.util.SortingWatermark+EventTimeCreatedbyobjectWatermarkOpScaladefmain(args:Array[String]):Unit=valenv=//1importorg.apache.flink.api.scala._//tuple2valtupStream=text.map(line=>{valarr=line.split(",")(arr(0), uration.ofSeconds(10))//最大允许的数据乱序时间{valsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss")varcurrentMaxTimestamp=0LoverridedefextractTimestamp(element:(String,Long),recordTimestamp:Long):=valtimestamp=currentMaxTimestamp=//计算当前的watermarkvalcurrentWatermark=currentMaxTimestamp-//print}).apply(newWindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow]overridedefapply(key:Tuple,window:TimeWindow,input:Ible[(String,Long)],out:Collector[String]):Unit={valkeyStr=//windowarrBuffvalarrBuff=ArrayBuffer[Long]()//arrBuffarrvalarr//arrvalsdf=newSimpleDateFormat("yyyy-MM-dd }}}[root@bigdata04soft]#nc-l900110:11:22],watermark:[1790820672000|2026-10-0110:11:12]EventEvent2026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:26],watermark:[1790820676000|2026-10-0110:11:16]EventEvent2026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:32],watermark:[1790820682000|2026-10-0110:11:22]2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:33],watermark:[1790820683000|2026-10-0110:11:23]EventEvent2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01来算,最早的数据已经过去了11s了,Window还没有开始计算,那到底什么时候会触发Window呢?[root@bigdata04soft]#nc-l900010:11:34],watermark:[1790820684000|2026-10-0110:11:24](0001),1,2026-10-0110:11:22,2026-10-0110:11:22,2026-10-0110:11:21,2026-10-01Event数据,则当Watermark时间>=EventTime时,就符合了Window触发的条件了,最终决定Window触发,还是由数据本身的EventTime所属Windowwindow_end_time决定。早的一条记录所在Window的window_end_time,所以Window就被触发了。[root@bigdata04soft]#nc-l900010:11:36],watermark:[1790820686000|2026-10-0110:11:26] Event
此时,Watermark时间虽然已经等于第二条数据的时间,但是由于其没有达到第二条数据所WindowWindowWindow时[root@bigdata04soft]#nc-l900010:11:37],watermark:[1790820687000|2026-10-0110:11:27](0001),1,2026-10-0110:11:26,2026-10-0110:11:26,2026-10-0110:11:24,2026-10-01Event1:Watermarkwindow_end_time2:在[inow_strt_tie,indow_nd_tim)区间中有数据存在(注意是左闭右开的区间)。同时满足了以上2个条件,Window才会触发。:+EentTimWatermarkEventTime机制,是如何处理乱序数据的。[root@hadoop100soft]#nc-l10:11:39],watermark:[1790820689000|2026-10-0110:11:29]10:11:39],watermark:[1790820689000|2026-10-0110:11:29]EventEvent10:11:31currentMaxTimestamp1:watermark时间>=window_end_timeWatermark时间(10:11:29)<window_end_time(10:11:33),WindowWindow一定就会触发了,我们试一试,继续输入内容。Event2个数据,10:11:3110:11:3210:11:33的数据,上边的结果,已经表明,对于的数据,Flink可以通过Watermark来实现处理一定范围内的乱序数据。那么对于“(lateelement)”太久的数据,Flink是怎么处理的呢?:LateElement(我们输入一个乱序很多的(EventTimeWatermark时间)数据来测试下:输入2行内容。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01Eventwatermark2026-10-01输入3行内容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]2:allowedLatenessFlink提供了allowedLateness方法可以实现对的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发window执行的。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-01(0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01EventWatermark2026-10-0110:11:33EventTime<Watermark的数据验证一下效果,输入3行内容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),2,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),3,2026-10-0110:11:30,2026-10-0110:11:31,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),4,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-01Event[root@hadoop100soft]#nc-l900010:11:44],watermark:[1790820694000|2026-10-0110:11:34]Event[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),5,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),6,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),7,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-011Watermark10:11:35。[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]此时,Watermark10:11:35我们再输入几条EventTime<Watermark3[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]□当Watemark等于10:11:33window_end_time所以会触发Window当窗口执行过后,我们再输入[10:11:30~10:11:33)WindowWindow是Watemark10:11:34的时候,我们输入[10:11:30~10:11:33)Window内的数据会发现Window也是可以被触发的。Watemark10:11:35的时候,我们输入[10:11:30~10:11:33)Window内的数据会发现Window不会被触发了。由于我们面设置了allowedLateness(Time.seconds(2)),因此可以允许延迟在2s内的数据继续触发Window执行。所以当Watermark是10:11:34的时候可以触发Window10:11:35的时候就。□时第二次(或多次)Watermark<window_end_time+allowedLateness时间内,这个窗口有Late数据到达时。Watermark10:11:34EventTime10:11:30、10:11:31、10:11:32window_end_time都是10:11:33,也就是10:11:34<10:11:33+2true。但是当Watermark等于10:11:35的时候,我们再输入EventTime为10:11:3010:11:3110:11:32window_end_time10:11:33,此时,10:11:35<10:11:33+2为false了,所以最终这些数据的时间太久了,就不会再触发Window的执行操作了。3:sideOutputLateData收集的数[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01此时,WindowWatermark10:11:33[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]此时,针对这几条的数据,都通过sideOutputLateData保存到了outputTag中[root@hadoop100soft]#nc-l900010:11:22],watermark:/p>
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 二零二五年度生态园林景观施工劳务合同4篇
- 二零二五版港口码头货物短途运输与跨境电商合作协议4篇
- 2025年度代付款电子支付解决方案合同3篇
- 第一人民医院二零二五年度进修人员医疗技术合作与共享协议3篇
- 2025年度绿色建筑项目参股合作协议3篇
- t型广告牌施工方案
- 穿越铁路顶管施工方案
- 2025年度教育信息化产品销售与实施服务协议3篇
- 2025年度车辆个人抵押权抵押权转让服务合同范本4篇
- 「绿色建筑」施工监理「2025年度」合同模板3篇
- 销售与销售目标管理制度
- 人教版(2025新版)七年级下册英语:寒假课内预习重点知识默写练习
- 2024年食品行业员工劳动合同标准文本
- 2024-2030年中国减肥行业市场发展分析及发展趋势与投资研究报告
- 运动技能学习
- 2024年中考英语专项复习:传统文化的魅力(阅读理解+完型填空+书面表达)(含答案)
- (正式版)HGT 22820-2024 化工安全仪表系统工程设计规范
- 2024年公安部直属事业单位招聘笔试参考题库附带答案详解
- 临沂正祥建材有限公司牛心官庄铁矿矿山地质环境保护与土地复垦方案
- 六年级上册数学应用题练习100题及答案
- 死亡报告年终分析报告
评论
0/150
提交评论