版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
本文由简悦SimpRead转码,原文地址我们在第本文由简悦SimpRead转码,原文地址我们在第02课时中使用FlinkTable&SQL的API实现了最简单的WordCount程序。在这一课时中,将分别从FlinkTable&SQL的背景和编程模型、常见的API、算子和内置函数等对FlinkTable&SQLFlinkTable&SQL开发。我们在前面的课时中讲过Flink的分层模型,Flink自身提供了不同级别的抽象来支持我们开发流式或者批量处理程序,下图描述了Flink支持的4种不同级别的抽象。TableAPISQLFlinkAPI操作。FlinkSQLFlink实时计算为简化计算SQL语义的开发语言。我们在第04课时中提到过,Flink在编程模型上提供了DataStream和DataSet两套API,并没有做到事实上的批流统一,因为用户和开发者还是开发了两套代码。正是因为FlinkTable&SQL的加入,可以说Flink在某种程度上做到了事实上的批流一体。你之前可能都了解过Hive,在离线计算场景下HiveSQL的解析用到了ApacheCalcite,Flink同样把SQL的解析、优化和执行教给了Calcite下图是一张经典的FlinkTable&SQL实现原理图,可以看到Calcite从图中可以看到无论是批查询SQL还是流式查询SQL,都会经过对应的转换器Parser转换成为节点树SQLNode从图中可以看到无论是批查询SQL还是流式查询SQL,都会经过对应的转换器Parser转换成为节点树SQLNodetree,然后生成逻辑执行计划LogicalPlan,逻辑执行计划在经过优化后生成真正可以执行的物理执行计划,交给DataSet或者DataStream的API去执行。在这里我们不对Calcite一个完整的FlinkTable&SQLJob也是由Source、Transformation、SinkSource部分Kafka、MySQLTransformation部分FlinkTable&SQLSQLSelect、Groupby等,当然在这里也有更为复杂的多流Join、流与维表的Join等;Sink部分是指的结果存储比如MySQL、HBase或Kakfa与传统的表SQL查询相比,FlinkTable&SQL在处理流数据时会时时刻刻处于动态的数据变化中,所动态表的查询与静态表一样,但是,在查询动态表的时候,SQL会做连续查询,不会终止。我们举个简单的例子,Flink程序接受一个Kafka流作为输入,Kafka首先,Kafka的消息会被源源不断的解析成一张不断增长的动态表,我们在动态表上执行的SQL会不断FlinkTable&首先,Kafka的消息会被源源不断的解析成一张不断增长的动态表,我们在动态表上执行的SQL会不断FlinkTable&SQL我们在讲解FlinkTable&SQL所支持的常用算子前,需要说明一点,Flink自从0.9Table&SQLPleasenotethattheTableAPIandSQLarenotyetfeaturecompleteandarebeingactively[stream,batch]FlinkTable&SQL的开发一直在进行中,并没有支持所有场景下的计算逻辑。从我个人实践角度来讲,目前FlinkSQL|||queryUNION[ALL]|queryEXCEPT|queryINTERSECTORDERBYorderItem[,orderItemLIMIT{count|ALL}|queryEXCEPT|queryINTERSECTORDERBYorderItem[,orderItemLIMIT{count|ALL}OFFSETstart{ROW|ROWS}FETCH{FIRST|NEXT}[count]expression[ASC|DESCSELECT[ALL|DISTINCT{*|projectItem[,projectItem]*FROMGROUPBY{groupItem[,groupItem}WINDOWwindowNameASwindowSpec[,windowNameAS]*SELECT[ALL|DISTINCT{*|projectItem[,projectItem]*expression[]columnAlias|tableAliastableReference|joinCondition[NATURAL]expression[]columnAlias|tableAliastableReference|joinCondition[NATURAL]ON|USING'('[,column[matchRecognize[[AS]alias['('columnAlias[,columnAlias]*')']TABLE][[catalogName.]schemaName.]LATERALTABLE'('functionName'('expression[,expression]*')'UNNEST'('expressionVALUESexpression[,expression'(''('expression[,expression]*CUBE'('expression[,expression]*ROLLUP'('expression'(''('expression[,expression]*CUBE'('expression[,expression]*ROLLUP'('expression[,expression]*GROUPINGSETS|ORDERBYorderItem[,orderItem]*PARTITIONBYexpression[,expression]*numericOrIntervalExpression|ROWSnumericExpression可以看到FlinkSQL和传统的SQL一样,支持了包含查询、连接、聚合等场景,另外还支持了包括窗SELECT、WHERESQLDataStreamDataSetSELECT*FROMSELECTname,可以看到FlinkSQL和传统的SQL一样,支持了包含查询、连接、聚合等场景,另外还支持了包括窗SELECT、WHERESQLDataStreamDataSetSELECT*FROMSELECTname,ageFROM当然我们也可以在WHERE条件中使用=、<、>、<>、>=、<=,以及AND、ORSELECTname,ageFROMTablewherenameLIKE小明SELECT*FROMTableWHEREage=SELECTname,FROMWHEREnameIN(SELECTnameFROMGROUPBY/GROUPBY用于进行分组操作,DISTINCTHAVING和传统SQLDISTINCTnameFROMSUM(score)asTotalScoreFROMTableBYSUM(score)asTotalScoreFROMTableBYnameJOIN可以用于把来自两个表的数据联合起来形成结果表,目前Flink的Join只支持等值连接。Flink支持的JOIN类型包括:JOIN-INNERLEFTJOIN-LEFTOUTERRIGHTJOINJOIN-INNERLEFTJOIN-LEFTOUTERRIGHTJOIN-RIGHTOUTERFULLJOIN-FULLOUTERSELECTFROMLEFTJOINProductU=SELECTFROMRIGHTJOINProductONU=SELECTFROMFULLOUTERJOINProductONU=根据窗口数据划分的不同,目前ApacheFlink有如下3滚动窗口滑动窗口,窗口数据有固定大小,并且有生成间隔;会话窗口[TUMBLE_START(timeCol,[TUMBLE_END(timeCol,FROMGROUPBY[gk],[TUMBLE_START(timeCol,[TUMBLE_END(timeCol,FROMGROUPBY[gk],TUMBLE(timeCol,'1'DAY)asFROMOrdersGROUPBYTUMBLE(timeLine,INTERVAL'1'DAY),其中,TUMBLE_STARTTUMBLE_END代表窗口的开始时间和窗口的结束时间,TUMBLE(timeLine,INTERVAL'1'DAY)中的timeLine代表时间字段所在的列,INTERVAL'1'DAY表示时间间隔为一天。其中,TUMBLE_STARTTUMBLE_END代表窗口的开始时间和窗口的结束时间,TUMBLE(timeLine,INTERVAL'1'DAY)中的timeLine代表时间字段所在的列,INTERVAL'1'DAY表示时间间隔为一天。滑动窗口有固定的大小,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的创建频率。滑动窗口的语法与滚动窗口相比,只多了一个slideFROMGROUPBY[gk],HOP(timeCol,slide,例如,我们要每间隔一小时计算一次过去24SELECTproduct,SUM(amount)FROMOrdersGROUPBYHOP(rowtime,SELECTproduct,SUM(amount)FROMOrdersGROUPBYHOP(rowtime,INTERVAL'1'INTERVAL'1'DAY),INTERVAL'1'HOURSESSION_START(timeCol,gap)ASSESSION_END(timeCol,gap)ASFROMGROUPBY[gk],SESSION(timeCol,举例,我们需要计算每个用户过去1SELECTuser,SESSION_START(rowtime,INTERVAL'1'HOUR)AS举例,我们需要计算每个用户过去1SELECTuser,SESSION_START(rowtime,INTERVAL'1'HOUR)ASSESSION_ROWTIME(rowtime,INTERVAL'1'HOUR)ASsEnd,GROUPBYSESSION(rowtime,INTERVAL'1'HOUR),Flink中还有大量的内置函数,我们可以直接使用,将内置函数分类如下:上面分别介绍了FlinkTable&SQL上面分别介绍了FlinkTable&SQL的原理和支持的算子,我们模拟一个实时的数据流,然后讲解SQLJOIN的用法。在上一课时中,我们利用Flink提供的自定义Source功能来实现一个自定义的实时数据源,具体实现privatebooleanisRunning=*重写run*@param*@throwsItemitem=public*@throwsItemitem=publicvoidcancel()isRunning=ItemintinewArrayList<String>list=Itemitem=newitem.setName(list.get(newreturn我们把实时的商品数据流进行分流,分成item.setName(list.get(newreturn我们把实时的商品数据流进行分流,分成even和odd两个流进行JOIN,条件是名称相同,最后,把两个流的JOIN结果输出。classStreamingDemopublicstaticvoidmain(String[]args)throwsExceptionEnvironmentSettingsbsSettings=StreamExecutionEnvironmentbsEnv=StreamTableEnvironmentbsTableEnv=StreamTableEnvironment.create(bsEnv,MyStreamingSource()).map(newMapFunction<Item,Item>(){publicItemmap(Itemitem)throwsExceptionreturnDataStream<Item>=source.split(newvalue)List<String>=if%2==0)DataStream<Item>=source.split(newvalue)List<String>=if%2==0)elsereturnDataStream<Item>oddSelect=List<String>output=newif(value.getId()%2==0)}elsereturnreturnbsTableEnv.createTemporaryView("evenTable",evenSelect
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 生产作业手法培训
- 施工监理合同
- 护理查房卧床病人管理
- 气性坏疽病人的护理
- 2024年度服装店库存处理合同2篇
- 工程设备租赁合同
- 食堂档口退租协议
- 2024年度工程清算与居间协议
- 2024年度采购合同标的质量与交付要求2篇
- 物业设备事故管理培训
- 好书推荐-《枫林渡》课件
- 《金属的化学性质》 完整版课件
- 中图版八年级下册地理知识点
- 《手术台就是阵地》优秀课件
- 小学数学人教五年级上册简易方程用方程解决问题(复习课)教学设计
- 2022年广东省深圳市中考化学试卷(含答案)
- 扫黄打非教育活动台账
- 特种工程中心防喷装置安装使用管理规定(试行)
- 花城版高中音乐鉴赏全册教案
- 封样管理规定
- 无纬带在电机绑扎中的应用研究
评论
0/150
提交评论