下载本文档
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
(1)Flink中算子的并行度有哪些设置方式,哪种的优先级最高?(2)假设有一个包含多行字符串的DataStream,每行字符串由空格分隔的多个单词组成。请你编写一个Flink程序,读取这个DataStream,并使用flatMap算子将字符串中的每个单词拆分出来,然后使用filter算子过滤出长度大于3的单词,并使用map算子将单词转换为小写。(3)假设现在有一个包含多行字符串的DataStream,每行字符串包含了多个信息,其中包括了姓名、年龄、性别和地址等信息,不同信息之间以空格分隔。请你编写一个Flink程序,读取这个DataStream,并使用flatMap算子将每行字符串拆分出来,然后使用map算子将每个信息转换为对应的类型(姓名为String类型,年龄为Int类型,性别为String类型,地址为String类型),最后使用keyBy算子按照性别进行分组,统计每个性别的人数和平均年龄。(4)编写自定义Source,从Redis数据库中读取数据,给出具体实现步骤。(5)编写自定义Sink,将数据写入Redis数据库,给出具体实现步骤。参考答案:答:并行度有哪些设置方式有:.全局设置;.数据源设置;.算子操作设置;.运行时配置;优先级最高的是运行时配置。答:objectMain{
defmain(args:Array[String]):Unit={
importorg.apache.flink.streaming.api.scala._
valenv=StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
valdataStream=env.readTextFile("路径")//这里的数据源可以切换
valfilteredWords:DataStream[String]=dataStream
.flatMap(_.split("\\s+"))
.filter(_.length>3)
.map(_.toLowerCase)
filteredWords.print()
env.execute()
}
}答:importorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobjectMain{caseclassPerson(name:String,age:Int,gender:String,address:String)defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalinputStream=env.readTextFile("路径")//这里的数据源可以切换valresultStream=inputStream.flatMap{line=>valArray(name,age,gender,address)=line.split("\\s+")Some(Person(name,age.toInt,gender,address))}.map(person=>(person.gender,(1,person.age))).keyBy(_._1).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))).map{tuple=>valgender=tuple._1valcount=tuple._2._1valtotalAge=tuple._2._2valaverageAge=totalAge/counts"Gender:$gender,Count:$count,AverageAge:$averageAge"}resultStream.print()env.execute()}}答:1)添加依赖:确保你的Flink项目中包含了必要的依赖,包括Flink本身的依赖和用于连接Redis的依赖2)编写自定义Source你需要实现一个RichParallelSourceFunction或者RichParallelSourceFunction[T](如果你知道数据的类型)的类。在这个类中,你需要实现run(SourceContext<T>ctx)方法,该方法定义了如何从Redis读取数据并发送到Flink的SourceContext中。3)在你的Flink作业中,你可以使用addSource方法来添加你的自定义Source。答:1)添加依赖:<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>YOUR_JEDIS_VERSION</version></dependency>2)
编写自定义Sink需要编写一个继承自
RichSin
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2022-2023学年广元市重点中学九年级数学第一学期期末经典试题含解析
- 公路建设服务承包协议范本
- 印刷报价单合同模板
- 保险合同范本大全
- 家居装饰加盟协议样本
- 保险公司员工口腔卫生培训手册
- 茶馆感染防控关键措施
- 【正版授权】 ISO 16404:2013 EN Space systems - Programme management - Requirements management
- 【正版授权】 ISO 1628-6:1990 EN Plastics - Determination of viscosity number and limiting viscosity number - Part 6: Methyl methacrylate polymers
- 幼儿园指导纲要培训课件
- 广东省深圳市三校2021-2022学年八年级下学期期末联考道德与法治试题
- 光伏勘察报告
- 2021-2022学年广东省广州市番禺区五年级下学期期末语文试卷
- 金属腐蚀电化学理论基础(钝化)详解课件
- 装修方案设计招标文件范本(完整版)
- ISO45001-2018职业健康安全管理体系内部审核报告
- 年产一万吨味精发酵工厂设计
- 铝合金牌号对照
- 弟子规一张纸打印版
- 初中各年级知识点占中考分比例
- 上海九院匡延平关于黄体期促排
评论
0/150
提交评论