版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
通信数据分析与实战SparkStreaming实时计算框架第七章第1节2理解实时计算的场景熟悉常用的实时计算框架学习目标TARGET什么是实时计算在传统的数据处理流程(离线计算)中,复杂的业务处理流程会造成结果数据密集,结果数据密集则存在数据反馈不及时,若是在实时搜索的应用场景中,需要实时数据做决策,而传统的数据处理方式则并不能很好地解决问题,这就引出了一种新的数据计算——实时计算,它可以针对海量数据进行实时计算,无论是在数据采集还是数据处理中,都可以达到秒级别的处理要求。实时计算框架1.ApacheSparkStreamingApache公司开源的实时计算框架。ApacheSparkStreaming主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。2.ApacheStormApache公司开源的实时计算框架,它具有简单、高效、可靠地实时处理海量数据,处理数据的速度达到毫秒级别,并将处理后的结果数据保存到持久化介质中(如数据库、HDFS)。实时计算框架3.ApacheFlinkApache公司开源的实时计算框架。ApacheFlink不仅可以支持离线处理,还可以支持实时处理。由于离线处理和实时处理所提供的SLA(服务等级协议)是完全不相同的,所以离线处理一般需要支持低延迟的保证,而实时处理则需要支持高吞吐,高效率的处理。4.Yahoo!S4Yahoo公司开源的实时计算平台。Yahoo!S4是通用的、分布式的、可扩展的,并且还具有容错和可插拔能力,供开发者轻松地处理源源不断产生的数据。6小结理解实时计算的场景熟悉常用的实时计算框架通信数据分析与实战SparkStreaming实时计算框架第七章第2节8知道SparkStreaming的作用掌握SparkStreaming的工作原理学习目标TARGETSparkStreaming概述SparkStreaming是构建在Spark上的实时计算框架,且是对SparkCoreAPI的一个扩展,它能够实现对流数据进行实时处理,并具有很好的可扩展性、高吞吐量和容错性。SparkStreaming具有易用性、容错性及易整合性的显著特点。SparkStreaming概述容错性整合性易用性SparkStreaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序。
SparkStreaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制,即每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以使用原始输入数据经过转换操作重新计算得出。SparkStreaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,进行交互式的查询操作。11小结知道SparkStreaming的作用掌握SparkStreaming的工作原理SparkStreaming工作原理SparkStreaming支持从多种数据源获取数据,包括Kafka、Flume、Twitter、ZeroMQ、Kinesis及TCPSockets数据源。当SparkStreaming从数据源获取数据之后,则可以使用诸如map、reduce、join和window等高级函数进行复杂的计算处理,最后将处理结果存储到分布式文件系统、数据库中,最终利用实时Web仪表板进行展示。SparkStreaming工作原理SparkStreaming先接收实时输入的数据流,并且将数据按照一定的时间间隔分成一批批的数据,每一段数据都转变成Spark中的RDD,接着交由Spark引擎进行处理,最后将处理结果数据输出到外部储存系统。14小结知道SparkStreaming的作用掌握SparkStreaming的工作原理通信数据分析与实战SparkStreaming实时计算框架第七章第3节16知道DStream的作用知道DStream的编程模型学习目标TARGETSpark的DStream流SparkStreaming提供了一个高级抽象的流,即DStream(离散流)。DStream表示连续的数据流,可以通过Kafka、Flume和Kinesis等数据源创建,也可以通过现有DStream的高级操作来创建。DStream的内部结构是由一系列连续的RDD组成,每个RDD都是一小段时间分隔开来的数据集。对DStream的任何操作,最终都会转变成对底层RDDs的操作。Spark的Dstream编程模型批处理引擎SparkCore把输入的数据按照一定的时间片(如1s)分成一段一段的数据,每一段数据都会转换成RDD输入到SparkCore中,然后将DStream操作转换为RDD算子的相关操作,即转换操作、窗口操作以及输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。19小结知道DStream的作用知道DStream的编程模型通信数据分析与实战SparkStreaming实时计算框架第七章第4节21熟悉Dstream的API掌握DStream的API操作学习目标TARGETDstream的操作SparkStreaming中对DStream的转换操作会转变成对RDD的转换操转换流程如下。其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。Dstream的操作DStreamAPI提供的与转换操作相关的方法方法名称相关说明map(func)将源DStream的每个元素,传递到函数func中进行转换操作,得到一个新的DStreamflatMap(func)与map()相似,但是每个输入的元素都可以映射0或者多个输出结果filter(func)返回一个新的DStream,仅包含源DStream中经过func函数计算结果为true的元素repartition(numPartitions)用于指定DStream分区的数量union(otherStream)返回一个新的DStream,包含源DStream和其他DStream中的所有元素Dstream的操作DStreamAPI提供的与转换操作相关的方法方法名称相关说明count()统计源DStream中每个RDD包含的元素个数,返回一个新DStreamreduce(func)使用函数func将源DStream中每个RDD的元素进行聚合操作,返回一个新DStreamcountByValue()计算DStream中每个RDD内的元素出现的频次,并返回一个新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次join(otherStream,[numTasks])当被调用类型分别为(K,V)和(K,W)键值对的两个DStream时,返回类型为(K,(V,W))键值对的一个新DStreamDstream的操作DStreamAPI提供的与转换操作相关的方法方法名称相关说明cogroup(otherStream,[numTasks])当被调用的两个DStream分别含有(K,V)和(K,W)键值对时,则返回一个新DStreamtransform(func)对源DStream中每个RDD应用RDD-to-RDD函数返回一个新DStream,在DStream中做任意RDD操作updateStateByKey(func)返回一个新状态DStream,通过在键的先前状态和键的新值上应用给定函数func更新每一个键的状态。该操作方法被用于维护每一个键的任意状态数据Transform操作DstreamAPI提供的与转换操作相关的方法和RDDAPI有些不同,不同之处在于RDDAPI中没有提供transform()和updateStateByKey()这两个方法,所以下面主要针对这两个方法进行详细的操作讲解Transform操作通过一个具体的案例来演示如何使用transform()方法将一行语句分割成多个单词,具体步骤如下:1执行”nc–lk9999”启动服务端且监听Socke服务。并输入数据“MyrolemodelisYumin”2创建名称为”spark_chapter07”的Maven项目。并配置”pom.xml”文件,引入SparkStreaming相关依赖3创建TransformTest对象,实现将数据拆分成多个单词的功能。Transform操作如果没有安装过nc,需要先执行yuminstallnc–y进行安装之后再执行以上的操作Transform操作<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<!--引入sparkStreaming依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--引入sparkstreaming整合kafka的依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies><build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
</build>Transform操作objectTransformTest{defmain(args:Array[String]):Unit={//1.创建SparkConf对象valsparkConf:SparkConf=newSparkConf().setAppName("TransformTest").setMaster("local[2]")//2.创建SparkContext对象,它是所有任务计算的源头valsc:SparkContext=newSparkContext(sparkConf)
//3.设置日志级别sc.setLogLevel("WARN")//4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔valssc:StreamingContext=newStreamingContext(sc,Seconds(5))//5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行valwords:DStream[String]=dstream.transform(rdd=>rdd.flatMap(_.split("")))
//7.打印输出结果words.print()//8.开启流式计算ssc.start()
//9.让程序一直运行,除非人为干预停止ssc.awaitTermination()UpdateStateByKey操作UpdateStateByKey操作通过一个具体的案例来演示如何使用updateStateByKey()方法进行词频统计,具体步骤如下:1在”spark_chapter07”的项目下,创建UpdateStateByKeyTest2定义词频统计的方法updateFunction3创建SparkConfig,SparkContext,StreamingContext对象,调用updateStateByKey方法。4测试updateStateByKey功能实现。UpdateStateByKey操作
//newValues表示当前批次汇总成的(word,1)中相同单词的所有1//runningCount表示历史的所有相同key的value总和defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={valnewCount=runningCount.getOrElse(0)+newValues.sumSome(newCount)}UpdateStateByKey操作
//1.创建SparkConf对象valsparkConf:SparkConf=newSparkConf().setAppName("WordCount").setMaster("local[2]")
//2.创建SparkContext对象valsc:SparkContext=newSparkContext(sparkConf)
//3.设置日志级别sc.setLogLevel("WARN")//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔valssc:StreamingContext=newStreamingContext(sc,Seconds(5))
//5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录ssc.checkpoint("./")//6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)
//7.按空格进行切分每一行,并将切分的单词出现次数记录为1valwordAndOne:DStream[(String,Int)]=dstream.flatMap(_.split("")).map(word=>(word,1))//8.调用updateStateByKey操作,统计单词在全局中出现的次数varresult:DStream[(String,Int)]=wordAndOne.updateStateByKey(updateFunction)//9.打印输出结果result.print()
//10.开启流式计算ssc.start()//11.让程序一直运行,除非人为干预停止ssc.awaitTermination()UpdateStateByKey操作4测试updateStateByKey功能实现。
//
1.在hadoop01开启nc服务,并输入测试单词:nc–lk9999sparkupdatezhongnanshanshenjilanshenjilanzhongnanshanzhangfuqingyuminzhongnanshanyumin
//2.运行UpdateStateByKeyTest的程序查看结果Dstream的窗口操作在SparkStreaming中,为DStream提供窗口操作,即在DStream流上,将一个可配置的长度设置为窗口,以一个可配置的速率向前移动窗口。根据窗口操作,对窗口内的数据进行计算,每次落在窗口内的RDD数据会被聚合起来计算,生成的RDD会作为WindowDStream的一个RDD。Dstream的窗口操作方法名称相关说明window(windowLength,slideInterval)返回基于源DStream的窗口进行批计算后的一个新DStreamcountByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素数reduceByWindow(func,windowLength,slideInterval)基于滑动窗口的源DStream中的元素进行聚合操作,返回一个新DStreamDstream的窗口操作方法名称相关说明reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑动窗口对(K,V)类型的DStream中的值,按K应用聚合函数func进行聚合操作,返回一个新DStreamreduceByKeyAndWindow(func,invFuncwindowLength,slideInterval,[numTasks])更高效的reduceByKeyAndWindow()实现版本。每个窗口的聚合值,都是基于先前窗口的聚合值进行增量计算得到。该操作会对进入滑动窗口的新数据进行聚合操作,并对离开窗口历史数据进行逆向聚合操作countByValueAndWindow(windowLength,slideInterval,[numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次,返回一个由(K,V)组成的新的DStreamDstream的窗口操作方法名称相关说明print()在Driver中打印出DStream中数据的前10个元素saveAsTextFiles(prefix,[suffix])将DStream中的内容以文本的形式进行保存,其中每次批处理间隔内产生的文件以“prefix-TIME_IN_MS[.suffix]”的方式命名。saveAsObjectFiles(prefix,[suffix])将DStream中的内容按对象进行序列化,并且以SequenceFile的格式保存。每次批处理间隔内产生的文件以“prefix-TIME_IN_MS[.suffix]”的方式命名。输出操作方法:Dstream的窗口操作方法名称相关说明saveAsHadoopFiles(prefix,[suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统输出操作方法:Window方法操作Window()操作通过一个具体的案例来演示如何使用Window()方法输出3个时间单位长度的数据,具体步骤如下:1在”spark_chapter07”的项目下,创建WindowTest2创建SparkConfig,SparkContext,StreamingContext对象,调用Window()方法。3测试Window()功能实现。Window()操作2创建SparkConfig,SparkContext,StreamingContext对象,调用Window()方法。
//1.创建SparkConf对象
valsparkConf:SparkConf=newSparkConf().setAppName("WindowTest").setMaster("local[2]")//2.创建SparkContext对象,它是所有任务计算的源头
valsc:SparkContext=newSparkContext(sparkConf)//3.设置日志级别
sc.setLogLevel("WARN")//4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
valssc:StreamingContext=newStreamingContext(sc,Seconds(1))
//5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)
//6.按空格进行切分每一行valwords:DStream[String]=dstream.flatMap(_.split(""))//7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
valwindowWords:DStream[String]=words.window(Seconds(3),Seconds(1))
//8.打印输出结果
windowWords.print()
//9.开启流式计算
ssc.start()
//10.让程序一直运行,除非人为干预停止
ssc.awaitTermination()Window()操作3测试Window()功能实现。
//
1.在hadoop01开启nc服务,并按每秒输入测试数据:nc–lk999912345
//2.运行WindowTest的程序查看结果Window()操作reduceByKeyAndWindow操作通过一个具体的案例来演示如何使用reduceByKeyAndWindow()方法统计3个时间单位内不同字母出现的次数,具体步骤如下:1在”spark_chapter07”的项目下,创建ReduceByKeyAndWindowTest2创建SparkConfig,SparkContext,StreamingContext对象,调用reduceByKeyAndWindow()方法。3测试reduceByKeyAndWindow功能实现。reduceByKeyAndWindow操作2创建SparkConfig,SparkContext,StreamingContext对象,调用reduceByKeyAndWindow()方法。
//1.创建SparkConf对象设置appName和master地址local[2]表示本地采用2个线程运行任务
valsparkConf:SparkConf=newSparkConf().setAppName("WordCount").setMaster("local[2]")
//2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskSchedulervalsc:SparkContext=newSparkContext(sparkConf)//3.设置日志级别
sc.setLogLevel("WARN")
//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
valssc:StreamingContext=newStreamingContext(sc,Seconds(1))reduceByKeyAndWindow操作2创建SparkConfig,SparkContext,StreamingContext对象,调用reduceByKeyAndWindow()方法。
//5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.按空格进行切分每一行,并将切分的单词出现次数记录为1valwordAndOne:DStream[(String,Int)]=dstream.flatMap(_.split("")).map(word=>(word,1))//7.调用updateStateByKey操作,统计单词在全局中出现的次数
valwindowWords:DStream[(String,Int)]=wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(3),Seconds(1))
//8.打印输出结果
windowWords.print()
//9.开启流式计算
ssc.start()
//10.让程序一直运行,除非人为干预停止
ssc.awaitTermination()reduceByKeyAndWindow操作3测试reduceByKeyAndWindow功能实现。
//
1.在hadoop01开启nc服务,并按每秒输入测试数据:nc–lk9999aabbc
//2.运行ReduceByKeyAndWindowTest的程序查看结果Window()操作SaveAsTextFiles操作通过一个具体的案例来演示如何使用saveAsTextFiles()方法保存输出的结果,具体步骤如下:1在”spark_chapter07”的项目下,创建SaveAsTextFilesTest2创建SparkConfig,SparkContext,StreamingContext对象,调用saveAsTextFiles()方法。3测试saveAsTextFiles功能实现。SaveAsTextFiles操作2创建SparkConfig,SparkContext,StreamingContext对象,调用saveAsTextFiles()方法。System.setProperty("HADOOP_USER_NAME","root")//1.创建SparkConf对象设置appName和master地址local[2]表示本地采用2个线程运行任务
valsparkConf:SparkConf=newSparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
//2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskSchedulervalsc:SparkContext=newSparkContext(sparkConf)//3.设置日志级别
sc.setLogLevel("WARN")//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
valssc:StreamingContext=newStreamingContext(sc,Seconds(5))
SaveAsTextFiles操作2创建SparkConfig,SparkContext,StreamingContext对象,调用saveAsTextFiles()方法。
//5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
valdstream:ReceiverInputDStream[String]=ssc.socketTextStream("192.168.121.134",9999)//6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
dstream.saveAsTextFiles("hdfs://hadoop01:9000/data/saveAsTextFiles/satf","txt")//7.开启流式计算
ssc.start()
//8.让程序一直运行,除非人为干预停止
ssc.awaitTermination()SaveAsTextFiles操作3测试saveAsTextFile功能实现。SaveAsTextFiles操作56小结熟悉Dstream的API掌握DStream的窗口操作通信数据分析与实战SparkStreaming实时计算框架第七章第5节58掌握Kafka的2种创建Dstream方式学习目标TARGETKafkaUtils.createDstream方式Kafka作为一个实时的分布式消息队列,实时地生产和消费消息。在大数据计算框架中,可利用SparkStreaming实时读取Kafka中的数据,再进行相关计算。在Spark1.3版本后,KafkaUtils里面提供了两个创建DStream的方式,一种是KafkaUtils.createDstream方式,另一种为KafkaUtils.createDirectStream方式。KafkaUtils.createDstream方式KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark
executors中,然后通过SparkStreaming启动job来处理这些数据。createDstream方式实现1导入项目依赖2创建名称为”SparkStreaming_Kafka_createDstream”类,实现词频统计3创建Topic主题,指定消息类别
4启动KafKa的消息生产者,测试程序输出SparkStreaming整合Kafka实战1.导入依赖#添加SparkStreaming整合Kafka的依赖<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_0-8_2.11</artifactId> <version>2.3.2</version></dependency>2.创建Scala类,实现词频统计在spark_chapter07项目的/src/main/scala/cn.it.dstream目录下,创建一个名为“SparkStreaming_Kafka_createDstream”的Scala类,用来编写SparkStreaming应用程序实现词频统计。SparkStreaming整合Kafka实战//1.创建sparkConf,并开启wal预写日志,保存数据源valsparkConf:SparkConf=newSparkConf().setAppName("SparkStreaming_Kafka_createDstream").setMaster("local[4]").set("spark.streaming.receiver.writeAheadLog.enable","true")//2.创建sparkContextvalsc=newSparkContext(sparkConf)
//3.设置日志级别sc.setLogLevel("WARN")
//3.创建StreamingContextvalssc=newStreamingContext(sc,Seconds(5))
//4.设置checkpointssc.checkpoint("./Kafka_Receiver")
//5
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 个人与银行2024年度借款合同3篇
- 专业吊车作业协议模板2024版
- 2024版产品标准化认证协议文件版B版
- 2024中美农产品加工与出口合作协议范文3篇
- 2024机器租赁协议书
- 抢占春节外卖市场
- 2024年度地产公司房地产广告效果评估与优化委托代理协议3篇
- 2024年股权质押担保协议标准格式版B版
- 解读现代小说奥秘
- 2024年货款分期偿还买卖约定
- 《汽车驱动桥》汽车标准
- 投资的本质:巴菲特的12个投资宗旨
- 护栏和扶手制作与安装工程检验批质量验收记录
- 医院文化建设与员工凝聚力提升
- 食堂安全操作规范培训课件(48张)
- 水库引调水工程项目可行性研究报告
- 中药饮片行业招商策划
- 乳头混淆介绍演示培训课件
- 胸腔积液引流的护理查房
- (完整文本版)河南2016定额计算规则
- 小升初个人简历模板下载
评论
0/150
提交评论