Flink实时大数据处理技术 课后习题及答案 05_第1页
Flink实时大数据处理技术 课后习题及答案 05_第2页
Flink实时大数据处理技术 课后习题及答案 05_第3页
全文预览已结束

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

(1)Flink中算子的并行度有哪些设置方式,哪种的优先级最高?(2)假设有一个包含多行字符串的DataStream,每行字符串由空格分隔的多个单词组成。请你编写一个Flink程序,读取这个DataStream,并使用flatMap算子将字符串中的每个单词拆分出来,然后使用filter算子过滤出长度大于3的单词,并使用map算子将单词转换为小写。(3)假设现在有一个包含多行字符串的DataStream,每行字符串包含了多个信息,其中包括了姓名、年龄、性别和地址等信息,不同信息之间以空格分隔。请你编写一个Flink程序,读取这个DataStream,并使用flatMap算子将每行字符串拆分出来,然后使用map算子将每个信息转换为对应的类型(姓名为String类型,年龄为Int类型,性别为String类型,地址为String类型),最后使用keyBy算子按照性别进行分组,统计每个性别的人数和平均年龄。(4)编写自定义Source,从Redis数据库中读取数据,给出具体实现步骤。(5)编写自定义Sink,将数据写入Redis数据库,给出具体实现步骤。参考答案:答:并行度有哪些设置方式有:.全局设置;.数据源设置;.算子操作设置;.运行时配置;优先级最高的是运行时配置。答:objectMain{

defmain(args:Array[String]):Unit={

importorg.apache.flink.streaming.api.scala._

valenv=StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

valdataStream=env.readTextFile("路径")//这里的数据源可以切换

valfilteredWords:DataStream[String]=dataStream

.flatMap(_.split("\\s+"))

.filter(_.length>3)

.map(_.toLowerCase)

filteredWords.print()

env.execute()

}

}答:importorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobjectMain{caseclassPerson(name:String,age:Int,gender:String,address:String)defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalinputStream=env.readTextFile("路径")//这里的数据源可以切换valresultStream=inputStream.flatMap{line=>valArray(name,age,gender,address)=line.split("\\s+")Some(Person(name,age.toInt,gender,address))}.map(person=>(person.gender,(1,person.age))).keyBy(_._1).reduce((x,y)=>(x._1,(x._2._1+y._2._1,x._2._2+y._2._2))).map{tuple=>valgender=tuple._1valcount=tuple._2._1valtotalAge=tuple._2._2valaverageAge=totalAge/counts"Gender:$gender,Count:$count,AverageAge:$averageAge"}resultStream.print()env.execute()}}答:1)添加依赖:确保你的Flink项目中包含了必要的依赖,包括Flink本身的依赖和用于连接Redis的依赖2)编写自定义Source你需要实现一个RichParallelSourceFunction或者RichParallelSourceFunction[T](如果你知道数据的类型)的类。在这个类中,你需要实现run(SourceContext<T>ctx)方法,该方法定义了如何从Redis读取数据并发送到Flink的SourceContext中。3)在你的Flink作业中,你可以使用addSource方法来添加你的自定义Source。答:1)添加依赖:<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>YOUR_JEDIS_VERSION</version></dependency>2)

编写自定义Sink需要编写一个继承自

RichSin

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论