6flink核心api之table和sq丨教程it资源_第1页
6flink核心api之table和sq丨教程it资源_第2页
6flink核心api之table和sq丨教程it资源_第3页
6flink核心api之table和sq丨教程it资源_第4页
6flink核心api之table和sq丨教程it资源_第5页
已阅读5页,还剩8页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

金职 慕课专 手 Java/6FlinkAPI之TableAPI和Flink之KafkaConnector

徐老师·更新于2020-10- 上一节5Flink 7Flink复盘下一注意:TableAPISQL现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的[TableAPI,SQL]和[流,批]的组合都是支持的。TableAPI和SQLFlink针对标准的流处理和批处理提供了两种关系型API,TableAPI和SQL。TableAPI允许用户以一种很直观的方式进行select、filter和join操作。FlinkSQL基于ApacheCalcite实现标准SQL。针对批处理和流FlinkTableAPI、SQL和Flink的DataStreamAPI、DataSetAPI是紧密联系在一起的TableAPI和SQL是一种关系型API,用户可以像操作Mysql数据库表一样的操作数据,而不需要写代码,更不需要手工的对代码进行调优。另外,SQL作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。如果你想要使用TableAPI和SQL的话<artifactId>flink-table-api-java-7<artifactId>flink-table-api-scala-如果你想在本地IDE中运行程序<artifactId>flink-table-nner-如果你用到了老的执行引擎,还需要添加下面这个依注意:由于部table相关的Scala实现的,所以,这个依赖也是必须的。【这个依赖我<artifactId>flink-streaming-TableAPI和SQL通过joinAPI集成在一起,这个joinAPI的概念是Table,Table可以作为查询的输入针对TableAPI和SQL我们主要讲解以下内容1:TableAPI和SQL的使用2:DataStream、DataSet和Table之间的互相转TableAPI和SQL的使想要使用TableAPI和SQL,首先要创建一个TableEnvironment对象下面我们来创建一个TableEnvironment对scala代码package2importimportimportorg.apache.flink.table.api.bridge.scala.{BatchTableEnvironment,importorg.apache.flink.table.api.{EnvironmentSettings,7 *CreatedbyobjectCreateTableEnvironmentScaladefmain(args:Array[String]):Unit= *注意:如果TableAPI和SQL不需要和DataStream或者DataSet*则针对stream和batch都可以使用//指定底层使用Blink引擎,以及数据处理模式-//从1.11版本开始,Blink引擎成为TableAPI和SQL的默认执行引擎,在生产环境下面//创建TableEnvironment对valsTableEnv=//指定底层使用Blink引擎,以及数据处理模式-//创建TableEnvironment对valbTableEnv=

注意:如果TableAPI和SQL需要和DataStream或者DataSet互相转针对stream需要使用针对batch需要使用//创建valssEnv=valssSettings=EnvironmentSettings.newInstance().useBlinknner().inStvalssTableEnv=StreamTableEnvironment.create(ssEnv,ssSettings)//创建valbbEnv=ExecutionEnvironment.getExecutionEnvironmentvalbbTableEnv=java package2importimportimportimportimportimport9 *CreatedbypublicclassCreateTableEnvironmentJavapublicstaticvoidmain(String[]args)*注意:如果TableAPI和SQL不需要和DataStream或者DataSet互相转*则针对stream和batch都可以使用//创建TableEnvironment对象-EnvironmentSettingssSettings=TableEnvironmentsTableEnv=//创建TableEnvironment对象-EnvironmentSettingsbSettings=TableEnvironmentbTableEnv=*注意:如果TableAPI和SQL需要和DataStream或者DataSet*针对stream需要使用*针对batch需要使用//创建StreamExecutionEnvironmentssEnv=EnvironmentSettingsssSettings=StreamTableEnvironmentssTableEnv=//创建//注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转ExecutionEnvironmentbbEnv=BatchTableEnvironmentbbTableEnv=}}下面我们来演示一下TableAPI和SQL的使目前官方推荐使用executeSql的方式,executeSql里面支持 下面我们来演示scala代码 package2 importorg.apache.flink.table.api.{EnvironmentSettings,45 *TableAPISQL的使*CreatedbyobjectTableAPIAndSQLOpScaladefmain(args:Array[String]):Unit=//获取valsTableEnv=//*connector.type:指定connector的类*connector.path:指定文件或 地*format.type:文件数据格式化类型,现在只支持csv格*注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用"createtablemyTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n""'connector.path'='D:\\data\\source',\n""'format.type'='csv'\n"//使用TableAPI实现数据查询和/*importvalresult=.filter($"id">//使用SQLvalresult=sTableEnv.sqlQuery("selectid,namefrommyTablewhere////"createtablenewTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n"

注意:针对SQL|createtable|id|name|)with|'connector.type'=|'connector.path'=|'format.type'=java package2importimportimport6importstatic8 *TableAPISQL的使*CreatedbypublicclassTableAPIAndSQLOpJavapublicstaticvoidmain(String[]args)//获取EnvironmentSettingssSettings=TableEnvironmentsTableEnv=//"createtablemyTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n""'connector.path'='D:\\data\\source',\n""'format.type'='csv'\n" //使用TableAPI实现数据查询和 /*Tableresult=sTableEnv.from("myTable") (("i") //使用SQL实现数据查询和过滤等操Tableresult=sTableEnv.sqlQuery("selectid,namefrommyTable//输出结果到控//创建"createtablenewTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n""'connector.path'='D:\\data\\res',\n""'format.type'='csv'\n"//输出结果到表newTable}}DataStream、DataSet和Table之间的互相转TableAPI和SQL可以很容易的和DataStream和DataSet程序集成到一块。通过TableEnvironment,可以把DataStream或者DataSet为Table,这样就可以使用TableAPI和SQL查询了。通过TableEnvironment也可以把Table对象转换为DataStream或者DataSet,这样就可以使用1:使用DataStream创建表,主要包含scala代码 package2importimportimport6 *CreatedbyobjectDataStreamToTableScaladefmain(args:Array[String]):Unit=//获取valssEnv=valssTableEnv=StreamTableEnvironment.create(ssEnv, //获取 import valstream=ssEnv.fromCollection(Array((1,"jack"),(2,"tom"),(3, //第一种:将DataStream转换为view视

ssTableEnv.sqlQuery("select*frommyTablewhereid>//第二种:将DataStream转换为table对valtable=ssTableEnv.fromDataStream(stream,$"id",$"name").filter($"id">//注意:'id,'name$"id",$"name"这两种写法是一样的效java package2importimportimportimportimportimportimportimportimport importstatic *CreatedbypublicclassDataStreamToTableJavapublicstaticvoidmain(String[]args)//获取StreamExecutionEnvironmentssEnv=EnvironmentSettingsssSettings=StreamTableEnvironmentssTableEnv=//获取ArrayList<Tuple2<Integer,String>>data=newdata.add(newdata.add(newdata.add(newDataStreamSource<Tuple2<Integer,String>>stream=//第一种:将DataStream转换为view视ssTableEnv.sqlQuery("select*frommyTablewhereid>//第二种:将DataStream转换为table对Tabletable=ssTableEnv.fromDataStream(stream,$("id"),table.select($("id"), 2:使用DataSet创建注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转scala代码 package2importimportimportimportorg.apache.flink.table.api.bridge.scala.{BatchTableEnvironment,7 *CreatedbyobjectDataSetToTableScaladefmain(args:Array[String]):Unit=//获取valbbEnv=valbbTableEnv=//获取importvalset=bbEnv.fromCollection(Array((1,"jack"),(2,"tom"),(3,//第一种:将DataSet转换为view视importbbTableEnv.sqlQuery("select*frommyTablewhereid> //第二种:将DataSet转换为table对 valtable=bbTableEnv.fromDataSet(set,$"id", .filter($"id"> //注意:'id,'name$"id",$"name"这两种写法是一样的效 java package2importimportimportimportimportimportimport ;importimportstatic *CreatedbypublicclassDataSetToTableJavapublicstaticvoidmain(String[]args)//获取ExecutionEnvironmentbbEnv=BatchTableEnvironmentbbTableEnv=//获取ArrayList<Tuple2<Integer,String>>data=newdata.add(newdata.add(newdata.add(newDataSource<Tuple2<Integer,String>>set=//第一种:将DataSet转换为view视bbTableEnv.sqlQuery("select*frommyTablewhereid>//第二种:将DataSet转换为table对Tabletable=bbTableEnv.fromDataSet(set,$("id"),table.select($("id"), 将Table转换为DataStream或者DataSet时,你需要指定生DataStream或者DataSet的数据类型,即,Table的每行数据要转换成的数据类型。通常最方便的选择是转换成Row。以下列表概述了不Row:通过角标映射字段,支持任意数量的字段,支持null值,无类型安全(type-safe)检查POJO:Java中的实体类,这个实体类中的字段名称需要和Tale中的字段名称保持一致,支持任意数量的字段,支持null值,有类型安全检查。CaseClass:通过角标映射字段,不支持null值,有类型Tuple:通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,有类型安全检AtomicType:Table必须有一个字段,不null值,有类型安全检查3:将表转换成流式查询的结果Table会态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转有几种模式可以将Table转换为DataStreamRetractMode:可以始终使用此模式,它使用一个Boolean标识来编码INSERT和DELETE更改。 package2importimportimportimport7 *将table转换成*CreatedbyobjectTableToDataStreamScaladefmain(args:Array[String]):Unit=//获取valssEnv=valssTableEnv=StreamTableEnvironment.create(ssEnv,//"createtablemyTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n""'connector.path'='D:\\data\\source',\n""'format.type'='csv'\n"//获取valtable=//将table转换为//如果只有新增(追加)操作,可以使用importvalappStream=//如果有增加操作,还有删除操作,则使用valretStream=valflag=valrow=valid=valname=//注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvir java packagecomimoocjavaimport importimportimportimportimportimportimportimportimportimport *将table转换成*CreatedbypublicclassTableToDataStreamJavapublicstaticvoidmain(String[]args)throws//获取StreamExecutionEnvironmentssEnv=EnvironmentSettingsssSettings=StreamTableEnvironmentssTableEnv=//"createtablemyTable(\n""idint,\n""namestring\n"")with(\n""'connector.type'='filesystem',\n""'connector.path'='D:\\data\\source',\n""'format.type'='csv'\n"//获取Tabletable=//将table转换为//如果只有新增(追加)操作,可以使用DataStream<Row>appStream=ssTableEnv.toAppendStream(table,appStream.map(newMapFunction<Row,Tuple2<Integer,String>>()publicTuple2<Integer,String>map(RowthrowsExceptionintid=Stringname=returnnewTuple2<Integer,String>(id, //如果有增加操作,还有删除操作,则使用DataStream<Tuple2<Boolean,Row>>retStream=retStream.map(newMapFunction<Tuple2<Boolean,Row>,publicTuple3<Boolean,Integer,String>map(Tuple2<Boolean,throwsExceptionBooleanflag=intid=Stringname=returnnewTuple3<Boolean,Integer,String>(flag,id, package2importimportimport6 *将table转换成*CreatedbyobjectTableToDataSetSc

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论