流式计算应用9_第1页
流式计算应用9_第2页
流式计算应用9_第3页
流式计算应用9_第4页
流式计算应用9_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

计算应用学习流式01自定义数据源目录02Kafka数据源自定义数据源1自定义数据源

1、用法说明

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。2、案例实操objectSparkStreaming03_DIY{

defmain(args:Array[String]):Unit={

valsparkConf=newSparkConf().setMaster("local[*]").setAppName("SparkStreaming")valssc=newStreamingContext(sparkConf,Seconds(3))

valmessageDS:ReceiverInputDStream[String]=ssc.receiverStream(newMyReceiver())messageDS.print()

ssc.start()ssc.awaitTermination()}/*

自定义数据采集器

1.继承Receiver,定义泛型,传递参数

2.重写方法*/

classMyReceiverextendsReceiver[String](StorageLevel.MEMORY_ONLY){privatevarflg=trueoverridedefonStart():Unit={newThread(newRunnable{overridedefrun():Unit={while(flg){valmessage="采集的数据为:"+newRandom().nextInt(10).toStringstore(message)Thread.sleep(500)}}}).start()}overridedefonStop():Unit={flg=false;}}}自定义数据源

3、采集结果-------------------------------------------Time:1612252791000ms-------------------------------------------采集的数据为:3采集的数据为:4采集的数据为:2Kafka数据源2Kafka数据源

1、版本选型ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。2、Kafka0-10Direct模式a)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。b)导入依赖<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version></dependency>Kafka数据源

c)编写代码objectDirectAPI{defmain(args:Array[String]):Unit={//1.创建SparkConfvalsparkConf:SparkConf=newSparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")//2.创建StreamingContextvalssc=newStreamingContext(sparkConf,Seconds(3))//3.定义Kafka参数valkafkaPara:Map[String,Object]=Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"linux1:9092,linux2:9092,linux3:9092",ConsumerConfig.GROUP_ID_CONFIG->"test_group","key.deserializer"->"mon.serialization.StringDeserializer","value.deserializer"->"mon.serialization.StringDeserializer")//4.读取Kafka数据创建DstreamvalkafkaDStream:InputDStream[ConsumerRecord[String,String]]=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Set(”test"),kafkaPara))//5.将每条消息的KV取出valvalueDStream:DStream[String]=kafkaDStream.map(record=>record.value())//6.

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论