《Spark大数据分析与实战》课件项目六 Spark Streaming处理流数据_第1页
《Spark大数据分析与实战》课件项目六 Spark Streaming处理流数据_第2页
《Spark大数据分析与实战》课件项目六 Spark Streaming处理流数据_第3页
《Spark大数据分析与实战》课件项目六 Spark Streaming处理流数据_第4页
《Spark大数据分析与实战》课件项目六 Spark Streaming处理流数据_第5页
已阅读5页,还剩73页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

BigDataAnalyticswithSpark项目六SparkStreaming处理流数据项目概述近年来,随着电子商务、舆情监控、传感监控、互联网金融等领域的发展,对数据实时处理的需求日渐增强,SparkStreaming计算框架就是为了实现流式数据的实时计算而产生的。本项目内容涵盖SparkStreaming读取套接字、文件流、Kafka等数据源数据,并进行实时处理,最终将结果输出到数据库中。项目效果通过本项目实践,可以实现Kafka收集电商用户行为数据后,编写SparkStreaming程序处理流式数据。例如实时计算过去30秒内用户下订单数、加入购物车数量、放入收藏夹数量如下所示:还可以将有用的数据(如用户购买行为数据)写入到MySQL等数据库,以供后台使用。目录任务1初识流数据处理模块SparkStreaming读取基础数据源到DStream中读取Kafka数据到DStream中DStream的转换操作任务2任务3任务4DStream的输出操作任务5任务6SparkStreaming实时处理电商用户行为数据任务1任务3DStream的输出操作任务5思维导图初识流数据处理模块SparkStreaming任务1SparkStreaming计算框架基本工作原理编写一个简单的SparkStreaming程序实现实时词频统计SparkStreaming的5阶段。我们日常处理的数据总体上可以分为括静态数据和流数据(动态数据)两大类;静态数据是一段较长的时间内相对稳定的数据,比如各类管理系统中的历史数据,例如企业的订单数据、教务系统中某课程的期末考试成绩等;对于静态数据一般采用批处理方式进行计算,可以在充裕的时间内对海量数据进行批量处理(即可以容忍较高的时间延迟),计算得到有价值的信息。HadoopMapReduce就是典型的批处理模型,用户可以在HDFS和HBase存放大量的静态数据,由MapReduce负责对海量数据执行批量计算。1.1SparkStreaming的产生流数据则是以大量、快速、时变的流形式持续到达,因此流数据是不断变化的数据;近年来,web应用、网络监控、传感监测等领域,流数据处理日渐兴起,成为当前数据处理领域的重要一环。比如电子商务领域,淘宝、京东等电商平台可以实时收集用户的搜索、点击、评论、加入购物车等各种用户行为,进而迅速发现用户的兴趣点、预判用户的购物行为,可以通过推荐算法为用户推荐其可能感兴趣的商品,一方面提高商家的销售额,另一方面提升消费者满意度及平台粘性;交通领域,安装了大量监控设备,可以实时收集车辆通过、交通违法等各种信息,进而对车流路况情况作出预判,提升车辆出行效率。1.1SparkStreaming的产生

流数据是时间上无上限的数据集合,因此其空间(容量)也没有具体限制。一般认为流数据具有如下特点:(1)数据快速持续到达,潜在大小也许是无穷无尽的;(2)数据来源众多,格式复杂;(3)数据量大,但是不十分关注存储;一旦经过处理,要么被丢弃,要么被归档存储;(4)注重数据的整体价值,不过分关注个别数据;(5)数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序。1.1SparkStreaming的产生正是由于流数据的上述特性,流数据不能采用传统的批处理方式,必须实时计算;实时计算最重要的一个需求是能够实时得到计算结果,一般要求响应时间为秒级或者毫秒级。在大数据时代,数据量巨大、数据样式复杂、数据来源众多,这些对实时计算提出了新的挑战,进而催生了,针对流数据的实时计算——流计算。1.1SparkStreaming的产生

目前,市场上存在Storm、Flink、S4等流计算框架;其中Storm是Twitter提出的、免费开源的分布式实时计算系统,Storm可简单、高效、可靠地处理大量的流数据;S4(SimpleScalableStreamingSystem)是Yahoo提出的开源流计算平台,具有通用的、分布式的、可扩展的、分区容错的、可插拔的特点;Flink是由Apache软件基金会开发的开源流处理框架,Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线可以执行批处理和流处理程序。1.1SparkStreaming的产生SparkStreaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。SparkStreaming可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。SparkStreaming支持从多种数据源提取数据,如Kafka、Flume、Twitter、ZeroMQ、文本文件以及TCP套接字等;并且可以提供一些高级API来表达复杂的处理算法,如map、reduce、join和window等;此外,SparkStreaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。1.1SparkStreaming的产生对于流数据,SparkStreaming接收实时输入的数据流后,将数据流按照时间片(秒级)为单位进行拆分为一个个小的批次数据,然后经Spark引擎以类似批处理的方式处理每个时间片数据。1.2

SparkStreaming的工作原理SparkStreaming将流式计算分解成一系列短小的批处理作业,也就是把SparkStreaming的输入数据按照时间片段(如1秒),分成一段一段的离散数据流(称之为DStream,DiscretizedStream);每一段数据都转换成Spark中的RDD,然后将SparkStreaming中对DStream流处理操作变为针对Spark中对RDD的批处理操作1.2

SparkStreaming的工作原理在进行实时单词统计时,DStreamlines中每个时间片的数据(存储句子的RDD)经flatMap操作,生成了存储单词的RDD;这些新生成的单词RDD对象就组成了words这个DStream对象。完成核心业务处理后,还可根据业务的需求对结果进一步处理,比如存储到外部设备中。1.2

SparkStreaming的工作原理利用Netcat工具向9999端口发送数据流(文本数据),使用SparkStreaming监听9999端口的数据流并进行词频统计。(1)运行Netcat工具并测试Netcat是一款著名的网络工具,它可以用于端口监听、端口扫描、远程文件传输以及实现远程shell等功能,Ubuntu系统系统自带Netcat工具;下面用两个shell窗口模拟两个人在局域网进行聊天,以此测试Netcat工具是否可以正常使用。打开两个Shell窗口,分别输入下图所示命令,用于监听9999端口;分别在两个窗口中输入字符,两个窗口可以分别收到对方放的数据;说明Netcat可以正常使用、通讯环境正常。1.3用spark-shell写第一个SparkStreaming程序(2)在spark-shell中编写程序在Linux终端使用如下命令进入sparkshell环境。注意SparkStreaming至少需要2个线程(一个接受流数据,一个处理数据);当在本地运行一个SparkStreaming程序时,不要使用“local”或者“local[1]”作为master的URL。这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务。如果你正在使用一个基于接收器(receiver)的输入离散流DStream(例如TCPsocket,Kafka,Flume等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据。因此,在本地运行时,需要使用“local[N]”作为masterURL,其中的N>运行接收器的数量。1.3用spark-shell写第一个SparkStreaming程序cd/usr/local/spark/bin./spark-shell--masterlocal[4]StreamingContext是所有流功能的主要入口点,导入相关包后,创建一个间歇时间为10秒的本地StreamingContext实例ssc。1.3用spark-shell写第一个SparkStreaming程序利用创建的ssc(StreamingContext对象),我们可以创建一个DStream对象lines;该DStream代表从localhost主机的9999端口流入的数据流。lines是从数据server接收到的数据流,其中每条记录都是一行文本。接下来,我们就要把这些文本行按空格分割成单词;与SparkRDD中的ftatMap类似,这里的ftatMap是一个映射算子,lines中的每行都会被ftatMap映射为多个单词,从而生成新的wordsDStream对象。。1.3用spark-shell写第一个SparkStreaming程序有了wordsDStream后,使用map方法将其RDD元素转换为(word,1)键值对形式;再使用reduceByKey算子,得到各个单词出现的频率、并打印输出。注意,执行以上代码后,SparkStreaming只是将计算逻辑设置好,此时并未真正的开始处理数据。要启动之前的处理逻辑,我们还要使用start方法启动流计算并等待程序结束。1.3用spark-shell写第一个SparkStreaming程序接下来在Linux终端,使用Netcat工具向9999端口发送文本数据。SparkStreaming即可计数10秒内数据流的词频并输出。1.3用spark-shell写第一个SparkStreaming程序下面在IDEA环境下,使用SparkStreaming完成流数据的实时词频统计,具体步骤如下:(1)创建Maven工程。在IntelliJIDEA中创建SparkStreaming工程,完毕后,在Maven的porm.xml文件中添加添加SparkStreaming组件相关依赖:<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_${scala.version}</artifactId><version>1.6.3</version></dependency>1.4用IDEA工具写第一个SparkStreaming程序(2)编写程序上述工程中,创建一个名为StreamingTest.scala的ScalaObject文件,文件中写入如下代码:importorg.apache.spark._importorg.apache.spark.streaming.StreamingContextimportorg.apache.spark.streaming.SecondsobjectStreamTest{defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[4]").setAppName("NetworkWordCount")valsc=newSparkContext(conf)//屏蔽控制台输出中的INFO日志输出sc.setLogLevel("WARN")

1.4用IDEA工具写第一个SparkStreaming程序//创建StreamingContextvalssc=newStreamingContext(sc,Seconds(10))//创建DStream,监听本机的9999端口vallines=ssc.socketTextStream("localhost",9999)//将监听到的文本切割成单词valwords=lines.flatMap(_.split(""))//将切割后的单词组成KV形式的键值对valpairs=words.map(word=>(word,1))//统计每个单词的词频valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}1.4用IDEA工具写第一个SparkStreaming程序(3)运行程序使用nc-lk9999命令打开netcat监听;运行StreamingTest.scala,在netcat窗口中,输入文本则在IDEA中输出词频统计的结果。1.4用IDEA工具写第一个SparkStreaming程序通过书写上述代码,可以发现编写SparkStreaming程序模式相对固定,其基本步骤包括: (1)通过创建输入DStream来定义输入源 (2)对DStream进行转换操作和输出操作来定义流计算。 (3)streamingContext.start()来开始接收数据和处理流程。 (4)streamingContext.awaitTermination()方法,等待处理结束(手动结束或因为错误而结束)。 (5)可以通过streamingContext.stop()来手动结束流计算进程。1.5编写SparkStreaming程序的基本步骤读取基础数据源到DStream中任务2SparkStreaming对接多种数据源,将获取的流数据生成DStream。在IDEA环境下,SparkStreaming从基础数据源中获取数据并创建DStream。SparkStreaming可以从HDFS文件系统目录、本地系统的文件目录读取数据到DStream中,本例将演示在IDEA环境下,编写SparkStreaming程序实时读取HDFS文件目录中的数据。(1)启动HDFS服务在Linux终端中,使用如下命令,启动hdfs服务(2)准备数据文件准备3个文件文件file1.txt、file2.txt、file3.txt(位于/home/hadoop目录下),其内容如下所示。2.1读取文件流cd/usr/local/hadoop/sbin./start-dfs.sh(3)编写程序IDEA工程中,新建一个scala文件StreamReadHdfs.scala,其代码如下://导入相关包importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.log4j.{Level,Logger}objectStreamReadHdfs{defmain(args:Array[String]):Unit={//设置Level级别,屏蔽控制台无关日志输出,便于观察输出结果Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//新建一个SparkConf实例、SparkContext实例valconf=newSparkConf().setMaster("local[4]").setAppName("StreamReadHdfs")valsc=newSparkContext(conf)2.1读取文件流//新建StreamingContext实例valssc=newStreamingContext(sc,Seconds(10))//创建DStream用于监听hdfs相关目录vallines=ssc.textFileStream("hdfs://localhost:9000/user/hadoop/spark_streaming")//逐行打印监听的数据lines.print()//开始SparkStreaming任务,任务持续执行,直到某种方式停止或发生异常ssc.start()ssc.awaitTermination()}}2.1读取文件流(4)运行测试运行StreamReadHdfs.scala程序,SparkStreaming开始监听hdfs文件系统的"hdfs://localhost:9000/user/hadoop/spark_streaming"目录,没有新文件输入时如下所示。在Linux终端中,使用以下命令将file1.txt、file2.txt、file3.txt依次上传到"hdfs://localhost:9000/user/hadoop/spark_streaming"目录下。2.1读取文件流cd/usr/local/hadoop/bin./hdfsdfs-mkdir/user/hadoop/spark_streaming./hdfsdfs-put/home/hadoop/file1.txt/user/hadoop/spark_streaming#上传file1.txt./hdfsdfs-put/home/hadoop/file2.txt/user/hadoop/spark_streaming#上传file1.txt./hdfsdfs-put/home/hadoop/file3.txt/user/hadoop/spark_streaming#上传file1.txt在IDEA的控制台,可以看到SparkStreaming监听到"hdfs://localhost:9000/user/hadoop/spark_streaming"目录下,不断有数据流入(上传新文件),并将数据内容输出。2.1读取文件流SparkStreaming可以方便的读取套接字流,只需要的调用StreamingContext类的socketTextStream方法即可,调用格式如下所示;valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("localhost",9999)其中,sc为SparkContext实例,“localhost”表示本机(也可以用主机的IP代替),“9999”为监听的端口号。任务1中已给出具体案例,在此不再重复。2.2读取套接字流SparkStreaming可以读取RDD组成的数据队列;这里我们创建一个队列,将动态生成的RDD不断发送到该队列中,SparkStreaming持续读取队列中的RDD。在工程中创建StreamReadRDD.scala文件,代码如下:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkContext}objectStreamReadRDD{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)2.3读取RDD队列流valconf=newSparkConf().setMaster("local[4]").setAppName("StreamReadRDD")valsc=newSparkContext(conf)valssc=newStreamingContext(sc,Seconds(2))//创建线程安全的队列,用于放置RDDvalrddQueue=newscala.collection.mutable.SynchronizedQueue[RDD[Int]]//创建一个线程,通过for循环向队列中添加新的RDDvaladdQueueThread=newThread(newRunnable{overridedefrun():Unit={for(i<-1to5){//向队列rddQueue添加新的RDDrddQueue+=sc.parallelize(i)//线程sleep2000毫秒Thread.sleep(2000)}}})2.3读取RDD队列流//创建DStream读取RDD系列valinputDStream=ssc.queueStream(rddQueue)inputDStream.print()//启动SparkStreamingssc.start()//启动addQueueThread线程,不断向rddQueue队列中添加新的RDDaddQueueThread.start()ssc.awaitTermination()}}执行StreamReadRDD.scala,其输出结果如右图所示:2.3读取RDD队列流读取Kafka数据到DStream中任务3SparkStreaming与Kafka对接的方法读取Kafka数据到DStream中,完成数据实时处理任务。除了套接字流、文件流、RDD队列流,SparkStreaming还支持Kafka、Flume、Kinesis等高级数据源;这一类别的数据源需要使用Spark库外的接口,其中一些还需要比较复杂的依赖关系(例如Kafka和Flume)。因此,为了最小化有关的依赖关系的版本冲突的问题,这些资源本身不能创建DStream的功能,需要通过依赖单独的类库实现创建DStream的功能。另外,这些高级数据源不能在SparkShell中使用,因此基于这些高级数据源的应用程序不能在SparkShell中直接测试;如想要在SparkShell中使用它们,则必须下载带有它的依赖的相应的Maven组件的JAR,并且将其添加到classpath;本项任务我们将在IDEA中编写SparkStreaming读取Kafka高级数据源。3.1SparkStreaming支持的高级数据源

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica)分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景;Kafka用scala语言编写,目前已成为Apache基金会顶级开源项目。Kafka有如下特点:高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。可扩展性:kafka集群支持热扩展。持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。容错性:允许集群中节点失败(集群中保留多个副本)。高并发:支持数千个客户端同时读写。3.2了解Kafka的工作原理Kafka的工作原理如下所示,消息生产者Producer(向Kafka发送数据的终端)产生数据后,通过Zookeeper找到Brocker(一台Kafka服务器就是一个Broker,一个集群可以由多个Broker组成)后,将数据放到Broker上并标记不同的主题topic;消息消费者Customer(从Kafka获取消息的终端)根据自身订阅的topic主题,通过Zookeeper找到相应的Broker,然后消费相关数据。3.2了解Kafka的工作原理使用Kafka模拟持续收集地交通监控设备发来的的监控数据(数据内容为:监控设备号,最高限速,车牌号,车辆通过时速),利用SparkStreaming读取Kafka中的数据,找出超速行驶的的车辆并在控制台输出。(1)安装Kafka进入Kafka的官网/downloads,下载与本机Scala版本一致的Kafka包,此安装包内已经附带Zookeeper,不需要额外安装Zookeeper。3.3Kafka的安装与测试在Linux终端下,使用如下命令完成Kafka解压等工作。(2)启动Kafka打开一个Linux终端,输入以下命令,启动zookeeper服务。打开第二个Linux终端,输入以下命令,启动kafka服务。3.3Kafka的安装与测试cd/usr/local/hadoop/sbin./start-dfs.shcd/usr/local/kafkabin/zookeeper-server-start.shconfig/pertiescd/usr/local/kafkabin/kafka-server-start.shconfig/perties(3)创建主题,测试Kafka是否安装成功打开第三个Linux终端,使用如下命令添加一个消息主题“mytopic”使用如下命令查看主题mytopic是否创建成功,创建成功则有显示。接下来使用命令,向主题mytopic中发送消息。3.3Kafka的安装与测试cd/usr/local/kafkabin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicmytopicbin/kafka-topics.sh--list--zookeeperlocalhost:2181bin/kafka-console-producer.sh--broker-listlocalhost:9092--topicmytopic打开第四个Linux终端,使用如下命令消费mytopic主题消息,可以显示相关信息,如图6-23所示。测试正常后,即可关闭第三、第四个Linux终端;但第一、第二个Linux终端不要关闭。3.3Kafka的安装与测试cd/usr/local/kafkabin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicstreamtest--from-beginning(4)准备测试数据准备如下格式的Kafka消息:监控设备号、最高限速、车牌号、通过时速,如图6-25所示:(5)使用Kafka控制台生成消息、测试程序本教程使用的Spark版本为2.2.3,与Kafka包存在一定的兼容问题;编译代码时有可能出现如下错误:3.4编写SparkStreaming程序找出超速车辆上述问题主要是Spark2.0以上版本将Logging类转移到包ernal包中(而非之前的org.apache.spark包)。我们可以从在当前工程下,建立org.apache.spark包,从Spark源码中将Logging.scala文件复制到org.apache.spark包中。3.4编写SparkStreaming程序找出超速车辆打开新的Linux终端,输入如下命令、输入相应消息,输入消息及控制台输出结果如下;找出了超速车辆粤A8512、粤A4432、粤A2893。3.4编写SparkStreaming程序找出超速车辆DStream的转换操作任务4在流计算中,要进行转换操作。DStream中常见的转换操作。所谓的DStream无状态转换操作,是指不记录历史状态信息,每次仅对新的批次数据进行处理;无状态转换操作每一个批次的数据处理都是独立的,处理当前批次数据时,即不依赖之前的数据,也不影响后续的数据。例如,任务1中的流数据词频统计,就采用无状态转换操作,每次仅统计当前批次数据中的单词词频,与之前批次数据无关,不会利用之前的历史数据。4.1DStream无状态转换操作4.1DStream无状态转换操作右表给出了常见的DStream无状态转换操作。DStream的操作与RDD的转换操作类似,在流数据词频统计程序中已用到map等操作,在此不再详述;但表中的transform方法值得深入探讨。transform方法使用户能够直接调用任意的RDD操作方法,极大的丰富了DStream上能够操作的内容。下面演示使用transform方法模拟过滤黑名单车辆;现有一个违章车辆黑名单文件blacklist.txt,记载了车辆车牌号、违法项目。4.1DStream无状态转换操作用Netcat模拟交通监控设备获取的车流信息,信息格式为“监控设备号,车牌号,记录时间”,样式如下所示;现要求SparkStreaming获取车流数据后,与和名单文件中的车牌号对照,输出违法车辆信息。4.1DStream无状态转换操作在Linux终端中,使用nc-lk9999命令准备向9999端口发送数据;运行TrafficStream.scala,在Netcat终端中输入数据,IDEA控制台输出结果如下所示(如果数据输入速度超过窗口时间,则输出结果可能有所不同)。4.1DStream无状态转换操作与无状态转换操作不同,DStream有状态转换操作当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和updateStateByKey转换。滑动窗口转换操作的计算过程如下所示,对于一个DStream,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算);然后窗口按照指定时间间隔在源DStream上滑动,每次落入窗口的RDD都会形成一个小段的DStream(称之为windowedDStream,包含若干个RDD),这时,就可以启动对这个小段DStream的计算。4.2

DStream有状态转换操作由窗口操作的原理可知,任何窗口相关操作都要指定两个参数:•windowlength——窗口的长度,即窗口覆盖的时间长度•slidinginterval——窗口每次滑动的距离,窗口启动的时间间隔注意:上述两个参数都必须是DStream批次间隔的整数倍;常用的窗口操作如下所示。4.2

DStream有状态转换操作上述操作中,window操作是基于源DStream的批次计算后得到新的DStream;例如要读取套接字流数据,设置批次间隔1秒,窗口长度为3秒,滑动时间间隔为1秒,截取DStream中的元素构建新的DStream使用Netcat向端口发送数据,按照每秒钟发一个字母的速度发送,输出结果如图6-33所示;可以看到,第一秒输出a,第二秒输出ab,第三秒输出abc,而第四秒输出bcd(因为a已经滑出当前窗口)。4.2

DStream有状态转换操作窗口操作中的reduceByKeyAndWindow操作与词频统计中使用的reduceByKey类似,但reduceByKeyAndWindow针对的是窗口数据源(DStream中截取的一段),是对窗口内所有数据进行计算。例如,设置窗口长度为3秒,滑动时间1秒,进行窗口内单词词频统计;使用Netcat向端口发送数据,按照每秒钟发一个字母的速度发送,输出结果如图6-64所示;可以看到,第一秒输出(a,1),第二秒输出(a,2),第三秒输出(a,1)、(b,1),此时第一个字母已经滑出窗口,所以a的数量减少一个。4.2

DStream有状态转换操作DStream的输出操作任务5处理完毕的数据可以按照业务要求输出到文件、数据库、展板中。将DStream写入到文本文件以及MySQL数据库中。输出算子可以将DStream的数据推送到外部系统,如数据库或者文件系统;SparkStreaming只有输出算子调用时,才会真正触发transformation算子的执行(与RDD类似)。目前所支持的输出算子如下所示。5.1DStream写入到文本文件下面使用saveAsTextFile方法,接收套接字数据流(端口号9999)后,进行单词词频统计,统计结果保存到文本文件中;然后,在IDEA中停止程序运行,然后检查下这些词频结果是否被成功地输出到“file:///home/hadoop/streamsave/file.txt”文件中了,在Linux终端中执行如下命令:5.1DStream写入到文本文件DStream.foreachRDD是一个非常强大的算子,用户可以基于此算子将DStream数据推送到外部系统中。在演示代码前,用户需要了解如何高效的使用这个工具;下面列举常见的错误。(1)在Spark驱动程序中建立数据库连接通常,对外部系统写入数据需要一些连接对象(如远程server的TCP连接),以便发送数据给远程系统。因此,开发人员可能会不经意地在Spark驱动器(driver)进程中创建一个连接对象,然后又试图在Sparkworker节点上使用这个连接。如下例所示:dstream.foreachRDD{rdd=>valconnection=createNewConnection()//这行在驱动器(driver)进程执行rdd.foreach{record=>connection.send(record)//而这行将在worker节点上执行}}这段代码是错误的,因为它需要把连接对象序列化,再从驱动器节点发送到worker节点。而这些连接对象通常都是不能跨节点(机器)传递的。5.2DStream写入到MySQL数据库的方法分析(2)为每一条记录建立一个数据库连接解决上述错误的办法就是在worker节点上创建连接对象。然而,有些开发人员可能会走到另一个极端——为每条记录都创建一个连接对象,例如:dstream.foreachRDD{rdd=>rdd.foreach{record=>valconnection=createNewConnection()connection.send(record)connection.close()}}一般来说,连接对象是有时间和资源开销限制的。因此,对每条记录都进行一次连接对象的创建和销毁会增加很多不必要的开销,同时也大大减小了系统的吞吐量。5.2DStream写入到MySQL数据库的方法分析(3)较为高效的做法一个比较好的解决方案是使用rdd.foreachPartition,为RDD的每个分区创建一个单独的连接对象,示例如下:dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>valconnection=createNewConnection()partitionOfRecords.foreach(record=>connection.send(record))connection.close()}}5.2DStream写入到MySQL数据库的方法分析下面使用foreachRDD方法,模拟处理出租车监控系统发来的车辆定位数据(包含车牌号、经度、维度、时间),接收到数据流后将其每10秒将其存入MySQL数据库。首先打开一个Linux终端,输入以下命令启动MySQL服务并进入MySQL客户端。进入MySQL客户端后,使用使用下列语句,创建MySQL数据库stream及数据库表car_position。5.3车辆定位数据写入到MySQL数据库servicemysqlstart#启动MySQL服务mysql-uroot-p#屏幕会提示你输入密码,输入正确密码后即进入MySQL客户端CREATEDATABASEstream;USEstream;CREATETABLEIFNOTEXISTS`car_position`(`id`BIGINTUNSIGNEDAUTO_INCREMENT,`carNO`VARCHAR(50)NOTNULL,`longitude`VARCHAR(40)NOTNULL,`latitude`VARCHAR(40)NOTNULL,`times`VARCHAR(40)NOTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;运行StreamMySQL.scala,使用Netcat向9999端口发送数据,如下所示。在IDEA控制台可以看到输出处理后的结果;在MySQL客户端,输入命令查询car_position表的记录,可以发现流数据成功写入MySQL,如下所示。5.3车辆定位数据写入到MySQL数据库下面使用foreachRDD方法,模拟处理出租车监控系统发来的车辆定位数据(包含车牌号、经度、维度、时间),接收到数据流后将其每10秒将其存入MySQL数据库。首先打开一个Linux终端,输入以下命令启动MySQL服务并进入MySQL客户端。进入MySQL客户端后,使用使用下列语句,创建MySQL数据库stream及数据库表car_position。5.3车辆定位数据写入到MySQL数据库servicemysqlstart#启动MySQL服务mysql-uroot-p#屏幕会提示你输入密码,输入正确密码后即进入MySQL客户端CREATEDATABASEstream;USEstream;CREATETABLEIFNOTEXISTS`car_position`(`id`BIGINTUNSIGNEDAUTO_INCREMENT,`carNO`VARCHAR(50)NOTNULL,`longitude`VARCHAR(40)NOTNULL,`latitude`VARCHAR(40)NOTNULL,`times`VARCHAR(40)NOTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;SparkStreaming实时处理电商用户行为数据任务6模拟解决电商用户行为数据处理问题。定时读取数据到Kafka的某主题下,SparkStream获取该主题的数据后进行处理。处理结果最终写入MySQL数据库。本任务模拟数据取自淘宝用户行为数据,数据源自阿里云天池数据集(/dataset/dataDetail?dataId=649);该数据集包含了2017年11月25日至2017年12月3日之间,有行为的约一百万随机用户的所有行为(行为包括点击、购买、加购、喜欢);即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:6.1数据集说明与准备工作整个数据集包含用户数量987994,商品数量4162024,商品类别9439,用户行为数据(行数)100150807。现抽取其中的2000行,构建子数据集userbehavior2000.csv(样例数据如图6-68所示)完成本实验。6.1数据集说明与准备工作本项任务整体实现思路如图6-69所示;

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论