PySpark大数据技术与应用 课件 4.3 Structured Streaming结构化流式处理_第1页
PySpark大数据技术与应用 课件 4.3 Structured Streaming结构化流式处理_第2页
PySpark大数据技术与应用 课件 4.3 Structured Streaming结构化流式处理_第3页
PySpark大数据技术与应用 课件 4.3 Structured Streaming结构化流式处理_第4页
PySpark大数据技术与应用 课件 4.3 Structured Streaming结构化流式处理_第5页
已阅读5页,还剩31页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

StructuredStreaming

结构化流式处理StructuredStreaming是Spark2.0版本开始推出的一种实时流框架,建立在SparkSQL之上,是一个可伸缩的、容错的流处理引擎。StructuredStreaming通过一致的API整合了批处理和流处理,可以像编写批处理程序一样编写流处理程序。StructuredStreaming结构化流式处理1StructuredStreaming编程模型目录StructuredStreaming概述2StructuredStreaming基础操作3StructuredStreaming编程步骤4StructuredStreaming基于SparkSQL引擎构建的可扩展且容错的StreamProcessingEngine(流处理引擎),可以在静态数据Dataset/DataFrame上像批处理计算一样进行流计算。可以使用SparkSQL中的Dataset/DataFrameAPI对数据流进行聚合、滑动窗口计算、流式数据与离线数据的连接等操作。此外,StructuredStreaming通过使用检查点和预写日志确保数据从端到端只被执行一次。StructuredStreaming包括微批处理和持续处理两种处理模型,默认使用微批处理处理模型。在微批处理模型中,StructuredStreaming将输入数据流视为一系列小批次作业进行处理,从而实现端到端的延迟低至100毫秒。自Spark2.3版本起,引入了持续处理模型,将端到端的延迟进一步降至1毫秒。对开发使用者来说,无须考虑StructuredStreaming以哪种方式计算,StructuredStreaming在底层会自动实现快速、可伸缩、容错等处理。StructuredStreaming概述相比于SparkStreaming,StructuredStreaming优点如下。同样能支持多种数据源的输入和输出。以结构化的方式操作流式数据,能够像使用SparkSQL处理离线的批处理一样,处理流数据,代码更简洁,写法更简单。基于Event-Time(事件时间,指事件发生的时间),相比于SparkStreaming的Processing-Time更精确,更符合业务场景。解决了SparkStreaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的问题。StructuredStreaming概述1StructuredStreaming编程模型目录StructuredStreaming概述2StructuredStreaming基础操作3StructuredStreaming编程步骤4StructuredStreaming的核心思想是将实时数据流抽象成一张无边界的表,输入的每一条数据当成输入表的一个新行,如下图。数据流中的数据一行一行地添加到无界表中,这样可以将流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询。StructuredStreaming编程模型在无界表上对输入的数据进行查询将生成结果表,系统每隔一定的周期会触发对无界表的计算和结果表的更新。最后,将结果表的结果写入到外部存储介质。StructuredStreaming编程模型如下图,设定批长度为1s,每一秒从输入源读取数据到输入无界表,然后触发查询计算,将结果写入结果表中。第1行是时间,每秒都会触发一次流计算。第2行是输入数据,对输入数据执行查询后产生的结果最终会被更新到结果表中。第4行是外部存储,输出模式是完全模式。StructuredStreaming编程模型1StructuredStreaming编程模型目录StructuredStreaming概述2StructuredStreaming基础操作3StructuredStreaming编程步骤4在PySpark,StructuredStreaming基础操作主要包括:

从输入源创建流式DataFrame的输入操作。

根据业务流程对流式DataFrame进行各种转换操作。

将运算结果进行输出操作。

使用窗口聚合操作对一段时间内的数据进行统计运算。StructuredStreaming基础操作1.输入操作SparkSession.readStream():读取流数据创建流式DataFrame。readStream可使用format()方法定义输入源,使用option()方法进行输入源的可选参数设置,使用load()方法载入数据。常见的内置输入源:File源、Kafka源、Socket源和Rate源。StructuredStreaming基础操作输入源File源以文件流的形式读取目录中写入的文件。支持的文件格式为text,CSV,JSON,ORC,Parquet。需注意的是,文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。File源的选项(option)包括如下几个。path:输入目录的路径,所有格式通用。maxFilesPerTrigger:每个触发器中要处理的最大新文件数(默认无最大值)。latestFirst:是否首先处理最新的文件,当有大量积压的文件时很有用,默认false。fileNameOnly:是否仅根据文件名而不是完整路径检查新文件,默认false。StructuredStreaming基础操作Socket源从一个本地或远程主机的某个端口服务上读取数据,数据的编码为UTF-8。由于Socket源无法提供端到端的容错保障,一般用于测试或学习。Socket源的选项(option)包括如下几个。host:要连接的主机,必须指定。port:要连接的端口,必须指定。StructuredStreaming基础操作Kafka源是流处理理想的输入源,因为它可以保证实时和容错。通用流程如下。应用数据输入-->Kafka-->SparkStreaming-->其他的数据库Kafka源的选项(option)包括如下几个。assign:指定所消费的Kafka主题和分区

。subscribe:订阅的Kafka主题,为逗号分隔的主题列表。subscribePattern:订阅的Kafka主题正则表达式,可匹配多个主题。kafka.bootstrap.servers:Kafka服务器的列表,逗号分隔的host:port列表。startingOffsets:起始位置偏移量。endingOffsets:结束位置偏移量。failOnDataLoss:布尔值,表示是否在Kafka数据可能丢失时,触发流计算失败。一般应当禁止,以免误报。StructuredStreaming基础操作Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数,从0开始。Rate源一般用作调试或性能基准测试。Rate源的选项(option)包括如下几个。rowsPerSecond:每秒生成多少行数据,默认值为1。rampUpTime:生成速度达到rowsPerSecond需要多少启动时间,使用比秒更精细的粒度将会被截断为整数秒,默认为0秒。numPartitions:使用的分区数,默认为Spark的默认分区数。StructuredStreaming基础操作2.转换操作可在流式DataFrame使用select()、where()、groupBy()等API进行类似于在静态DataFrame上的查询、投影、聚合操作,也可在流式DataFrame使用map(),filter(),flatMap()等API进行类似于在RDD的操作。不支持读取前n行这样的操作,如不支持limit()、take(n)、show()等,也不支持distinct、count()等操作。StructuredStreaming基础操作一个流式DataFrame可以和一个静态DataFrame进行join()操作,连接方式为Inner(内连接)或LeftOuter(左外连接),连接后得到一个新的流式DataFrame。此外,一个流式DataFrame也可以和另一个流式DataFrame以Inner方式进行join()操作,连接机制是通过追溯被进行连接的流式DataFrame已经接收到的流数据和主动进行连接的流式DataFrame的当前批次进行key(键值)的配对。为了避免追溯过去太久的数据造成性能瓶颈,可以通过设置watermark(水位线)来清空过去太久的历史数据的状态,数据被清空状态后将允许不被配对查询。另外,sort()操作仅在聚合后在完整输出模式下支持,不支持两个流之间的任何join()操作。StructuredStreaming基础操作3.输出操作使用DataFrame的writeStream()方法保存流计算的结果。writeStream()有一些可以设定的选项,如使用outputmode()方法配置输出模式从而控制输出内容,使用format()方法配置输出接收器类型,使用可选项queryName()标识查询的名称,使用可选项triggerinterval()设定触发间隔,在option()中有参数checkpointlocation,可以设置检查点保存输出的结果以保证数据的可靠性,最后使用start()方法启动流计算。StructuredStreaming基础操作outputmode()内参数指定输出模式,输出模式用于控制输出接收器内容,主要有3种输出模式,分别为Append模式、Complete模式和Update模式,且不同类型的流式查询支持不同的输出模式。CompleteMode(完全模式):更新后的整个结果表将被写入外部存储。如何处理整个表的写入由输出接收器决定。AppendMode(追加模式):默认模式。自上次触发后,只将结果表中追加的新行写入输出接收器。适用结果表中的现有行不期望被改变的查询,如select()、where()、map()、flatmap()、filter()、join()等操作支持该模式。UpdateMode(更新模式):自上次触发后,只有在结果表中更新、增加的行才写入输出接收器。该模式只输出自上次触发以来更改的行。如果查询不包括聚合,该模式等同于追加模式。StructuredStreaming基础操作format()内参数指定输出接收器类型。StructuredStreaming内置的输出接收器包括File接收器、Kafka接收器、Console接收器、Memory接收器和Foreach接收器等,其中Console接收器和Memory接收器仅用作测试。具体说明如下。File接收器,将计算结果以文件的形式输出到指定目录中。默认文件格式为Parquet,也支持ORC、JSON和CSV等格式文件。支持的输出模式:Append选项path:必须指定输出目录的路径。容错:是。数据只会被处理一次。StructuredStreaming基础操作Kafka接收器,将计算结果输出到Kafka的一个或多个主题。支持的输出模式:Append、Complete、Update。选项kafka.bootstrap.servers:Kafka服务器的列表,逗号分隔的host:port列表。Topic:主题容错:是,数据至少被处理一次。StructuredStreaming基础操作Console接收器,将计算结果输出到控制台,一般用于小量数据的调试。支持的输出模式:Append、Complete、Update。选项numRows:每次触发后打印多少行,默认为20truncate:如果行太长是否截断,默认为“是”容错:否。StructuredStreaming基础操作Memory接收器,将计算结果作为内存中的表存储在内存中,也用于小量数据的调试。支持的输出模式:Append、Complete。选项:无。容错:否。StructuredStreaming基础操作Foreach接收器,参数是一个foreach的方法,用户可以通过实现这个方法实现一些自定义的功能。支持的输出模式:Append、Complete、Update。选项:无。容错:是,数据至少被处理一次。StructuredStreaming基础操作如果机器发生故障导致宕机,可以使用检查点(checkpoint)恢复先前查询的进度和状态,并从中断处继续。配置检查点后,查询将保存所有进度信息和运行聚合到检查点目录中。检查点目录必须是与HDFS兼容的文件系统中的路径。StructuredStreaming基础操作4.窗口聚合操作数据产生的时间也被称为事件时间(eventtime),一般嵌入到流数据中作为一个字段。与事件时间对应的是处理时间(processingtime),指数据被处理的时间。StructuredStreaming滑动窗口聚合操作基于事件时间,一般使用事件时间作为窗口切分的依据,例如每秒钟的成交均价,是取事件时间每秒钟的数据进行处理。基于窗口的聚合是在事件时间上的特殊类型的分组和聚合,每个时间窗口是一个组,按照每个窗口进行聚合操作。StructuredStreaming基础操作考虑到数据可能存在延迟,如果一个数据到达时其对应的时间批次已经被计算,则重新计算这个时间批次的数据并更新之前的计算结果。此外,对延迟数据的处理,可以通过水印(watermarking)机制进行。水印可以让引擎自动更新数据中的当前事件时间,并清理旧的状态。使用withWatermark()方法定义水印,如withWatermark('eventTime','1hour'),其中eventTime为数据事件时间,时间阈值为1小时,这样一个小时以外延迟到达的数据由于水印的设置被Spark丢弃,而一个小时以内延迟到达的数据则会被正常处理。水印的输出模式必须是Append或Update,Complete要求保留所有的聚合数据,导致中间状态无法被清理。StructuredStreaming基础操作1StructuredStreaming编程模型目录StructuredStreaming概述2StructuredStreaming基础操作3StructuredStreaming编程步骤4StructuredStreaming编程主要任务是对流式DataFrame进行一系列操作,在PySpark中,使用Python编写StructuredStreaming流应用程序过程步骤与编写SparkStreaming流应用程序一样,主要包括如下4个步骤。导入pyspark模块及相应类,创建SparkSession对象,构建StructuredStream-ingContext环境。定义输入源创建流式DataFrame。根据业务逻辑,操作流式DataFrame。启动流计算并输出结果。以单词统计程序为实例,讲解StructuredStreaming程序编写运行整个过程。StructuredStreaming编程步骤1.程序编写第1步,导入pyspark相关模块,创建SparkSession对象,构建StructuredStreamingContext环境。。导入pyspark及相关函数模块,创建一个SparkSession对象。因为程序需要用到拆分字符串和展开数组内的所有单词,故导入pyspark.sql.functions模块中的split、explode函数,其中split()方法用于将字符串切片并转换为列表,explode()方法用于将一行数据转化为多行形式。使用sparkContext.setLogLevel()方法用于设置日志显示级别,本次设置为“WARN”,只输出等级为警告以上的信息,从而排除INFO等日志级别的信息干扰,避免在运行过程中显示大量不必要的信息。StructuredStreaming编程步骤第2步,定义输入源创建流式DataFrame。本次使用SparkSession对象的readStreamAPI定义结构化流输入源,创建一个流式DataFram。其中format(‘socket’)用于定义输入源为Socket类型,它作为一个TCP协议客户端,将从由option()方法指明的“本机(localhost)的9999端口”Socket服务器端中持续接收文本数据,load()方法表示载入数据,结果保存在名称为lines的流式DataFrame内。需要说明的是,lines为一张包含流文本数据的无界表,这个表包含一个字符串列,列名为value,流文本数据中的每一行都是表中的一行。由于目前只是在设置所需的转换,还没有启动转换,因此目前还没有接收任何数据。StructuredStreaming编程步骤第3步,根据业务逻辑,操作流式DataFrame。对创建好的linesDataFrame,首先使用split()方法以空格作为分隔符,将每一行字符串拆分为一个个单词组成的列表,接着使用explode()方法将该列表展开为一列,列的每行只有一个单词,并使用alias()方法对这个列定义别名为“word”,再使用select()方法进行查询选取,此时linesDataFrame转化为wordsDataFrame。对wordsDataFrame使用g

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论