




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、Spark4.sparkStreaming 和 storm 的区别1. spark 运行的 job 在哪里可以看到Spark 优化flume:日志收集系统,主要用于系统日志的收集kafka:消息队列,进行消息的缓存和系统的解耦storm:实时计算框架,进行流式的计算。Spark 应用转换流程1、spark 应用提交后,经历了一系列的转换,最后成为task 在每个节点上执行2、RDD 的 Action 算子触发 Job 的提交,生成 RDD DAG3、由 DAGScheduler 将 RDD DAG 转化为 Stage DAG,每个 Stage 中产生相应的 Task 集合4、TaskSched
2、uler 将任务分发到 Executor 执行5、每个任务对应相应的一个数据块,只用用户定义的函数处理数据块Driver 运行在Worker 上通过.apache.spark.deploy.C nt 类执行作业,作业运行命令如下:作业执行流程描述:1、客户端提交作业给Master2、Master 让一个Worker 启动Driver,即 SchedulerBackend。Worker 创建一个DriverRunner 线程,DriverRunner启动SchedulerBackend 进程。3、另外Master 还会让其余Worker 启动Exeuctor,即 ExecutorBackend。
3、Worker 创建一个ExecutorRunner线程,ExecutorRunner 会启动ExecutorBackend 进程。4、ExecutorBackend 启动后会向Driver 的SchedulerBackend 。SchedulerBackend 进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage 的task,都会被存放到TaskScheduler 中, ExecutorBackend 向SchedulerBackend 汇报的时候把TaskScheduler 中的task 调度到ExecutorBackend 执行。5、所有sta
4、ge 都完成后作业结束。Driver 运行在客户端# spark 面试问题收集# spark 面试问题# 1、spark 中的 RDD 是什么,有哪些特性*RDD(Resint Distributed Dataset)叫做分布式数据集,是 Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。*Dataset:就是一个集合,用于存放数据的*Distributed:分布式,可以并行在集群计算*Resint:表示弹性的作业执行流程描述:1、客户端启动后直接运行用户程序,启动Driver 相关的工作:DAGScheduler 和BlockManagerMaster 等。
5、2、客户端的Driver 向Master。3、Master 还会让Worker 启动Exeuctor。Worker 创建一个ExecutorRunner 线程,ExecutorRunner 会启动ExecutorBackend 进程。4、ExecutorBackend 启动后会向Driver 的SchedulerBackend。Driver 的DAGScheduler作业并生成相应的Stage,每个Stage 包含的Task 通过TaskScheduler 分配给Executor 执行。5、所有stage 都完成后作业结束。* 弹性表示* 1、RDD 中的数据可以在内存或者是磁盘* 2、RDD
6、 中的分区是可以改变的*五大特性:*Alist of partitions一个分区列表,RDD 中的数据都存在一个分区列表里面*Afunction for computing each split作用在每一个分区中的函数*Alist of dependencies on other RDDs一个 RDD 依赖于其他多个 RDD,这个点很重要,RDD 的容错机制就是依据这个特性而来的* Optionally, a Partitioner for key-value RDDs (e.g. to saythe RDD is hash-partitioned)t可选的,针对于 kv 类型的 RDD 才
7、具有这个特性,作用是决定了数据的来源以及数据处理后的去向* Optionally, a list of preferred locations to compute each spliton (e.g. block locations for an HDFS file)可选项,数据本地性,数据位置最优# 2、概述一下 spark 中的常用算子区别(map、mapPartitions、foreach、foreachPartition)* map:用于遍历 RDD,将函数 f 应用于每一个元素,返回新的 RDD(transformation算子)。* foreach:用于遍历 RDD,将函数 f
8、应用于每一个元素,无返回值(action 算子)。* mapPartitions:用于遍历操作 RDD 中的每一个分区,返回生成一个新的 RDD(transformation 算子)。* foreachPartition: 用于遍历操作RDD 中的每一个分区。无返回值(action 算子)。* 总结:一般使用 mapPartitions 或者 foreachPartition 算子比 map 和 foreach更加高效,使用。# 3、谈谈 spark 中的宽窄依赖* RDD 和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrowdependency)和宽依赖(wide dep
9、endency)。* 宽依赖:指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition* 窄依赖:指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition使用。# 4、spark 中如何划分 stage* 1.Spark Application 中可以因为不同的 Action 触发众多的 job,一个Application 中可以有很多的 job,每个 job 是由一个或者多个 Stage的,后面的 Stage 依赖于前面的 Stage,也就是说只有前面依赖的 Stage 计算完毕后,后面的 Stage 才会运行。* 2.
10、Stage 划分的依据就是宽依赖,何时产生宽依赖,例如reduceByKey,groupByKey 的算子,会导致宽依赖的产生。* 3.由 Action(例如 collect)导致了 SparkContext.runJob 的执行,最终导致了 DAGScheduler 中的 submitJob 的执行,其是通过发送一个 case classJobSubmitted 对象给 eventProsLoop。eventProsLoop 是 DAGSchedulerEventProsLoop 的具体实例,而DAGSchedulerEventProsLoop 是 eventLoop 的子类,具体实现 Ev
11、entLoop 的onReceive 方法,onReceive 方法转过来回调doOnReceive* 4.在 doOnReceive 中通过模式匹配的方法把执行路由到* 5.在 handleJobSubmitted 中首先创建 finalStage,创建 finalStage 时候会建立父 Stage 的依赖链条* 总结:以来是从代码的逻辑层面上来展开说的,可以简单点说:写介绍什么是 RDD 中的宽窄依赖,然后在根据 DAG 有向无环图进行划分,从当前 job 的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个 stage,然后继续按照这种方式在继续往前推,如在
12、遇到宽依赖,又划分成一个 stage,一直到最前面的一个算子。最后整个 job 会被划分成多个 stage,而stage 之间又存在依赖关系,后面的 stage 依赖于前面的 stage。# 5、spark-submit 的时候如何引入外部 jar 包* 在通过 spark-submit 提交任务时,可以通过添加配置参数来指定* -driver-class-path 外部 jar 包* -jars 外部 jar 包# 6、spark 如何防止内存溢出* driver 端的内存溢出* 可以增大 driver 的内存参数:spark.driver.memory (default 1g)* 这个参数
13、用来设置 Driver 的内存。在 Spark 程序中,SparkContext, DAGScheduler 都是运行在 Driver 端的。对应 rdd 的 Stage 切分也是在 Driver 端运行,如果用户自己写的程序有过多的步骤,切分出过多的 Stage,这部分信息消耗的是 Driver 的内存,这个时候就需要调大 Driver 的内存。* map 过程产生大量对象导致内存溢出* 这种溢出的原因是在单个 map 中产生了大量的对象导致的,例如: rdd.map(x=for(i for(i (k,1).reduceBykey(_+_).map(k=(k._2,k._1).sortByK
14、ey(false).take(10)* 如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。* 经过分析,倾斜的数据主要有以下三种情况:* 1、null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。* 2、无效数据,大量重复的测试数据或是对结果影响不大的有效数据。* 3、有效数据,业务导致的正常数据分布。* 解决办法* 第 1,2 种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。* 第 3 种情况则需要进行一些特殊操作,常见的有以下几种做法* (1)执行,将异常的 key 过滤出来单独处理,最后与正常数据的处理结果进行
15、union 操作。* (2) 对 key 先添加随机值,进行操作后,去掉随机值,再进行一次操作。* (3) 使用 reduceByKey 代替 groupByKey(reduceByKey 用于对每个 key 对应的多个 value 进行 merge 操作,最重要的是它能够在本地先进行 merge 操作,并且 merge 操作可以通过函数自定义.)* (4) 使用 map join。* 案例* 如果使用 reduceByKey 因为数据倾斜造成运行失败的问题。具体操作流程如下:*(1)将原始的 key 转化为 key + 随机值(例如 Random.next)*(2)对数据进行 reduceB
16、yKey(func)*(3)将 key + 随机值 转成 key*(4)再对数据进行 reduceByKey(func)* 案例操作流程分析:* 假设说有倾斜的 Key,给所有的 Key 加上一个随机数,然后进行 reduceByKey 操作;此时同一个 Key 会有不同的随机数前缀,在进行reduceByKey 操作的时候原来的一个非常大的倾斜的Key 就分而治之变成若干个更小的 Key,不过此时结果和原来不一样,怎么破?进行 map 操作,目的是把随机数前缀去掉,然后再次进行 reduceByKey 操作。(当然,如果你很无聊,可以再次做随机数前缀),这样就可以把原本倾斜的 Key 通过分
17、而治之方案分散开来,最后又进行了全局聚合* 注意 1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行 union 即可。* 注意 2: 单独处理异常数据时,可以配合使用 MapJoin 解决。* 2、spark 使用不当造成的数据倾斜* 提高 shuffle 并行度* dataFrame 和 sparkSql 可以设置spark.sql.shuffle.partitions 参数控制 shuffle 的并发度,默认为 200。* rdd 操作可以设置 spark.default.parallelism 控制并发度,默认参数由不同的 Cluster Mana
18、ger 控制。* 局限性: 只是让每个 task 执行更少的不同的 key。无法解决个别 key 特别大的情况造成的倾斜,如果某些 key 的大小非常大,即使一个 task 单独执行它,也会受到数据倾斜的困扰。* 使用 map join 代替 reduce join* 在小表不是特别大(取决于你的 executor 大小)的情况下使用,可以使程序避免 shuffle 的过程,自然也就没有数据倾斜的困扰了.(详细见、)* 局限性: 因为是先将小数据发送到每个 executor上,所以数据量不能太大。# 11、flume 整合 sparkStreaming 问题* (1)、如何实现 sparkSt
19、reamingflume 中的数据* 可以这样说:* 前期经过技术调研,查看官网相关资料,发现 sparkStreaming 整合 flume 有 2 种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这 2 种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。* 推模式:Flume 将数据 Push 推给 Spark Streaming* 拉模式:Spark Streaming 从 flume 中 Poll 拉取数据* (2)、在实际开发的时候是如何保证数据不丢失的* 可以这样说:* flume 那边采用的 channel 是将数据落地到磁盘中,保证数
20、据源端安全性(可以在补充一下,flume 在这里的 channel 可以设置为 memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择 channel 为磁盘。整个流程运行有一点的延迟性)* sparkStreaming 通过拉模式整合的时候,使用了 FlumeUtils这样一个类,该类是需要依赖一个额外的 jar 包(spark-streaming-flume_2.10)* 要想保证数据不丢失,数据的准确性,可以在构建StreamingConext 的时候,利用 StreamingContext.getOrCreate(checkpo,creatingFu
21、nc: () = StreamingContext)来创建一个 StreamingContext,使用StreamingContext.getOrCreate 来创建 StreamingContext 对象,传入的第一个参数是 checkpo的存放目录,第二参数是生成 StreamingContext 对象的用户自定义函数。如果 checkpo的存放目录存在,则从这个目录中生成StreamingContext 对象;如果不存在,才会调用第二个函数来生成新的StreamingContext 对象。在 creatingFunc 函数中,除了生成一个新的StreamingContext 操作,还需要
22、完成,然后调用ssc.checkpo(checkpoDirectory)来初始化 checkpo功能,最后再返回StreamingContext 对象。这样,在 StreamingContext.getOrCreate 之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在 start()调用前执行。* 流失计算中使用 checkpo的作用:*保存元数据,包括流式应用的配置、流式没之前定义的、未完成所有操作的 batch。元数据被到失败的存储系统上,如 HDFS。这种 ckeckpo主要针对 driver 失败后的修复。
23、*上,如 HDFS。这种 ckeckpo保存流式数据,也是到失败的系统主要针对 window operation、有状态的操作。无论是 driver 失败了,还是 worker 失败了,这种 checkpo都够快速恢复,而不需要将很长的历史数据都重新计算一遍(以便得到当前的状态)。* 设置流式数据 checkpo的周期* 对于一个需要做 checkpo的 DStream 结构,可以通过调用 DStream.checkpo(checkpoerval)来设置 ckeckpo的周期,经验上一般将这个 checkpo周期设置成 batch 周期的 5 至 10 倍。* 使用 write ahead l
24、ogs 功能* 这是一个可选功能,建议加上。这个功能将使得输入数据写入之前配置的 checkpo目录。这样有状态的数据可以从上一个checkpo开始计算。开启的方法是把spark.streaming.receiver.writeAheadLogs.enable 这个 property 设置为 true。另外,由于输入 RDD 的默认 StorageLevel 是 MEMORY_AND_DISK_2,即数据会在两台 worker 上做 replication。实际上,Spark Streaming 模式下,任何从网络输入数据的 Receiver(如 kafka、flume、socket)都会在两
25、台机器上做数据备份。如果开启了 write ahead logs 的功能,建议把 StorageLevel 改成MEMORY_AND_DISK_SER。修改的方法是,在创建 RDD 时由参数传入。* 使用以上的 checkpo机制,确实可以保证数据 0 丢失。但是一个前提条件是,数据发送端必须要有缓存功能,这样才能保证在 spark应用重启期间,数据发送端不会因为 spark streaming 服务不可用而把数据丢弃。而flume 具备这种特性,同样 kafka 也具备。* (3)Spark Streaming 的数据可靠性* 有了 checkpo机制、write ahead log 机制、
26、Receiver 缓存机器、可靠的 Receiver(即数据接收并备份成功后会发送 ack),可以保证无论是 worker失效还是 driver 失效,都是数据 0 丢失。原因是:如果没有 Receiver 服务的 worker 失效了,RDD 数据可以依赖血统来重新计算;如果 Receiver 所在 worker失败了,由于 Reciever 是可靠的,并有 write ahead log 机制,则收到的数据可以保证不丢;如果 driver 失败了,可以从checkpo中恢复数据重新构建。# 12、kafka 整合 sparkStreaming 问题* (1)、如何实现 sparkStrea
27、mingkafka 中的数据* 可以这样说:在 kafka0.10 版本之前有二种方式与 sparkStreaming整合,一种是基于 receiver,一种是 direct,然后分别阐述这 2 种方式分别是什么* receiver:是采用了 kafka 高级 api,利用 receiver来接受 kafka topic 中的数据,从 kafka 接收来的数据会在 spark 的executor 中,之后 spark streaming 提交的 job 会处理这些数据,kafka 中 topic的偏移量是保存在 zk 中的。* 基本使用: val kafkaStream =KafkaUtils
28、.createStream(streamingContext,ZK quorum, consumer groupartitions to consume), per-topic number of Kafka* 还有几个需要注意的点:* 在 Receiver 的方式中,Spark 中的partition 和 kafka 中的 partition 并不是相关的,所以如果加大每个 topic的 partition 数量,仅仅是增加线程来处理由单一 Receiver 消费的。但是这并没有增加 Spark 在处理数据上的并行度.* 对于不同的 Group 和 topic可以使用多个 Receiver
29、创建不同的 Dstream 来并行接收数据,之后可以利用 union 来统一成一个 Dstream。* 在默认配置下,这种方式可能会因为底层而丢失数据. 因为 receiver 一直在接收数据,在其已经通知 zookeeper数据接收完成但是还没有处理的时候,executor 突然挂掉(或是 driver 挂掉通知executor 关闭),缓存在其中的数据就会丢失. 如果希望做到高可靠, 让数据零丢失,如果启用了 Write AheadLogs(spark.streaming.receiver.writeAheadLog.enable=true)该机制会同步地将接收到的 Kafka 数据写入分
30、布式文件系统(比如 HDFS)上的预写日志中. 所以, 即使底层节点出现了失败, 也可以使用预写日志中的数据进行恢复.到文件系统如 HDFS,那么 storage level 需要设置成StorageLevel.MEMORY_AND_DISK_SER,也就是 KafkaUtils.createStream(., StorageLevel.MEMORY_AND_DISK_SER)* direct:在 spark1.3 之后,引入了 Direct 方式。不同于Receiver 的方式,Direct 方式没有 receiver 这一层,其会周期性的获取 Kafka中每个 topic 的每个 part
31、ition 中的最新 offsets,之后根据设定的maxRatePartition 来处理每个 batch。(设置spark.streaming.kafaxRatePartition=10000。限制每秒钟从 topic 的每个 partition 最多消费的消息条数)。* (2) 对比这 2 中方式的优缺点:* 采用 receiver 方式:这种方式可以保证数据不丢失,但是无法保证数据只被处理一次,WAL 实现的是east-once 语义(至少被处理一次),如果在写入到外部的数据还没有将offset 更新到zookeeper 就挂掉,这些数据将会被反复消费. 同时,降低了程序的吞吐量。*
32、采用 direct 方式:相比 Receiver 模式而言能够确保机制更加健壮.区别于使用 Receiver 来接收数据, Direct 模式会周期性动查询 Kafka,来获得每个 topic+partition 的最新的 offset, 从而定义每个 batch 的 offset的范围. 当处理数据的 job 启动时, 就会使用 Kafka 的简单 consumer api 来获取 Kafka 指定 offset 范围的数据。* 优点:* 1、简化并行* 如果要多个 partition, 不需要创建多个输入 DStream 然后对它们进行 union 操作. Spark 会创建跟 Kafka
33、 partition一样多的 RDD partition, 并且会并行从 Kafka 中partition 和 RDD partition 之间, 有一个一对一的数据. 所以在 Kafka关系.* 2、高性能* 如果要保证零数据丢失, 在基于 receiver的方式中, 需要开启 WAL 机制. 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka 自己本身就有高可靠的机制, 会对数据一份, 而这里又会一份到 WAL 中. 而基于 direct 的方式, 不依赖 Receiver, 不需要开启 WAL机制, 只要 Kafka 中作了数据的, 那么就可以通过 Kafka 的副本进行恢
34、复.* 3、一次且仅一次的事务机制* 基于 receiver 的方式, 是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的. 这是消费 Kafka 数据的传统方式. 这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一次, 可能会处理两次. 因为Spark 和ZooKeeper 之间可能是不同步的. 基于 direct 的方式, 使用kafka 的简单 api, SparkStreaming 自己就负责追踪消费的 offset, 并保存在 checkpo中. Spark 自己一定是同步的, 因此可以保证数据是
35、消费一次且仅消费一次。不过需要自己完成将 offset 写入zk 的过程,在文档中都有相应介绍.*简单代码实例:*messages.foreachRDD(rdd=val message =rdd.map(_._2)/对数据进行一些操作message.map(method)/更新 zk 上的 offset (自己实现)updateZKOffsets(rdd)* sparkStreaming 程序自己消费完成后,自己主动去更新 zk 上面的偏移量。也可以将 zk 中的偏移量保存在或者 redis 数据库中,下次重启的时候,直接或者 redis中的偏移量,获取到上次消费的偏移量,接着数据。# 13、利用 scala 语言实现排序* (1)冒泡排序:*package cn.sort*/冒泡排序*class BubbleSort *def main(args: ArrayString): Unit = *val list = List(3, 12, 43, 23, 7, 1, 2, 0)*prln(sort(list)*/定义一
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024-2025管理人员安全培训考试试题(B卷)
- 2024-2025项目安全培训考试试题附答案【基础题】
- 25年公司级员工安全培训考试试题附答案【黄金题型】
- 2024-2025公司安全管理员安全培训考试试题答案黄金题型
- Unit 6 第4课时 Section 3 Writing【基础深耕】七年级英语下册高效课堂(沪教版2024)
- 企业培训正能量
- 关于《社会保障法》的学习心得体会
- 小学2025年春季少先队文艺汇演计划
- 四年级劳动教育兴趣小组计划
- 气候变化与生态伦理-全面剖析
- 2024年中国分析仪器市场调查研究报告
- “龙岗青年”微信公众号代运营方案
- DB11-T 478-2022 古树名木评价规范
- 施工现场扬尘控制专项方案
- 年度固定污染源排污许可证质量审核、执行报告审核技术支持服务 投标方案(技术标 )
- 五年级科学上册(冀人版)第17课 彩虹的形成(教学设计)
- 科学与文化的足迹学习通超星期末考试答案章节答案2024年
- 医院培训课件:《病区药品安全管理与使用》
- 光电融合器件工艺
- 国家义务教育质量监测八年级学生心理健康模拟测试
- 服装导购销售流程及技巧
评论
0/150
提交评论