分布式计算、云计算与大数据 第2版 课件 第9章 大数据技术与编程_第1页
分布式计算、云计算与大数据 第2版 课件 第9章 大数据技术与编程_第2页
分布式计算、云计算与大数据 第2版 课件 第9章 大数据技术与编程_第3页
分布式计算、云计算与大数据 第2版 课件 第9章 大数据技术与编程_第4页
分布式计算、云计算与大数据 第2版 课件 第9章 大数据技术与编程_第5页
已阅读5页,还剩94页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第9章大数据技术与编程提纲5.1 大数据背景与概述5.2 大数据处理关键技术5.3 大数据计算模式9.4基于Hadoop的大数据编程实践9.5基于Spark的大数据编程实践大数据产生的背景

1980年,未来学家阿尔文·托夫勒在《第三次浪潮》中推崇大数据,但当时数据领域尚处于初期。首个数据中心和关系数据库便出现在这个时代。随着物联网的兴起,越来越多设备连接互联网,收集客户使用模式和产品性能数据,而机器学习也推动了数据增长。然而,尽管已经存在一段时间,大数据的潜力仍未完全释放。今天,云计算提供了弹性和可扩展性,使开发人员能够轻松测试数据子集,从而引发了全球技术变革,大数据技术得以崭露头角。

2005年左右,人们开始意识到在线服务如Facebook和YouTube产生了大量数据。同时,Hadoop和NoSQL等工具的出现降低了数据存储成本,推动了大数据的发展。大数据量在接下来几年急速增长,现在全球“用户”--不仅有人类,还有机器不断产生数据。大数据的定义

数据量大!

数据类型复杂!大数据的“5V”特征

容量(Volume)

速率(Velocity)

多样性(Variety)

真实性(Veracity)

价值(Value)大数据的5V特征大数据的5V特征

容量(Volume)这是指大规模的数据量,并且数据量呈持续增长趋势。通常,它们的规模超过10T,但随着技术进步,定义中的数据大小也可能会改变。大规模的数据对象构成的集合,即称为“数据集”。这些数据集由不同类型的数据组成,具有不同的特征,包括维度、稀疏性和分辨率。数据集可以分为记录数据集(存储在数据库中的记录集合)、基于图形的数据集(包含数据对象之间的联系,并用图形表示)和有序数据集(包括时间和空间信息,用于存储时间序列和空间数据等)。大数据的5V特征

速率(Velocity)即数据生成、流动速率快。数据流动速率指指对数据采集、存储以及分析具有价值信息的速度。因此也意味着数据的采集和分析等过程必须迅速及时。

真实性(Veracity)指数据的质量和保真性。大数据环境下的数据最好具有较高的信噪比。信噪比与数据源和数据类型无关。大数据的5V特征

多样性(Variety)指是大数据包括多种不同格式和不同类型的数据。数据来源包括人与系统交互时与机器自动生成,来源的多样性导致数据类型的多样性。根据数据是否具有一定的模式、结构和关系,数据可分为三种基本类型:结构化数据、非结构化数据、半结构化数据。结构化数据:指遵循一个标准的模式和结构,以二维表格的形式存储在关系型数据库里的行数据。非结构化数据:指不遵循统一的数据结构或模型的数据(如文本、图像、视频、音频等),不方便用二维逻辑表来表现。半结构化数据:指有一定的结构性,但本质上不具有关系性,介于完全结构化数据和完全非结构化数据之间的数据。大数据的5V特征

价值(Value)指大数据的低价值密度。随着数据量的增长,数据中有意义的信息却没有成相应比例增长。而价值同时与数据的真实性和数据处理时间相关,如图所示。大数据发展趋势大数据发展趋势大数据发展趋势提纲9.1 大数据背景与概述9.2 大数据处理关键技术9.3 大数据计算模式9.4基于Hadoop的大数据编程实践9.5基于Spark的大数据编程实践大数据处理关键技术大数据采集大数据预处理大数据存储及管理大数据分析及挖掘大数据展现及应用大数据采集

数据采集(DAQ),又称数据获取,是指从传感器和其它待测设备等模拟和数字被测单元中自动采集信息的过程。根据数据量大小、数据复杂度以及采用数据库不同,数据采集分“传统的数据采集”和“大数据的数据采集”两类(区别如下)。采集方法

大数据技术在数据采集方面采用了以下方法:系统日志采集方法许多互联网企业都使用分布式架构的工具,如Hadoop的Chukwa、Apache的Flume、Cloudera的Flume、Facebook的Scribe等,用于采集系统日志,能够满足大量日志数据每秒数百MB的采集和传输需求。网络数据采集方法网络数据采集是通过爬虫或网站API等手段从网站上提取数据,将非结构化数据存储为本地结构化文件,支持采集图片、音频、视频等文件或附件采集。同时,网络流量采集可以用带宽管理技术如DPI或DFI进行处理。其它数据采集方法对于企业生产经营数据或学科研究数据等保密性要求较高的数据,可以通过与企业或研究机构合作,使用特定系统接口等相关方式采集数据。采集平台

以下是几款应用广泛的大数据采集平台,供参考。大数据采集平台简介1ApacheFlumeFlume是Apache的开源数据采集系统,具备高可靠性、可扩展性和易管理性,支持客户端扩展。它基于JRuby构建,需要Java运行环境。2FluentdFluentd是一个开源的数据收集框架,使用C/Ruby编写,采用JSON格式统一日志数据。它支持多种数据源和输出格式,具有高可靠性和良好的扩展性,由TreasureData,Inc提供支持和维护。3LogstashLogstash是ELK数据栈中的L,是一个开源数据采集系统,使用JRuby开发,运行时依赖JVM。4SplunkForwarderSplunk是分布式机器数据平台,包括三个角色:SearchHead负责搜索和信息抽取,Indexer负责存储和索引,Forwarder负责数据的采集、清洗和发送给Indexer。大数据预处理

缺失数据处理缺失数据是常见情况,当收集数据时,有些样本数据可能因各种原因缺失。如何有效处理这些缺失数据以用于算法训练是需要解决的关键问题。缺失数据处理方法介绍1删除法简单删除因特殊异常原因导致的数据缺失,只需删除极少数的样本数据,且未来模型应用中缺失维度的情况很少发生。2填充法通常使用默认值或均值等填充缺失维度信息的方法很常见,因其易操作和易解释等优势。但有时填入相同的数值可能会降低该维度的区分度。3映射到高维空间完美的保留的缺失值这个信息,不会对原始信息加入人为的先验知识,带来的问题就是数据维度的增加,算法的计算量也随之变大。大数据预处理

数据数值化在收集到的各维度信息中,有些是字符串,如性别和学历水平等。这些信息无法直接用于算法计算,通常需要将它们转换成数值形式以便后续算法计算。数据数值化方法介绍1离散编码对于可穷举的字符串通常根据出现的频率进行编码即可,例如男出现100次,女出现80次,将男编码为0,女编码为1。2语义编码对于无法通过穷举法完全表示的信息,如文本分类中的自然语言信息,通常采用词嵌入(wordembedding)方法,其中基于Google的word2vec方法是一个较好的选择。在同一语料库训练下,这些词嵌入可以携带一些语义信息。大数据预处理

大数据存储及管理

背景存储规模大种类和来源多样化,存储管理复杂对数据的种类和水平要求高大数据存储及管理

有效存储和管理大数据的三种方式:不断加密企业对各种数据的安全至关重要,通常视为私有和受控的。然而,黑客攻击频繁发生,网络攻击报道屡见不鲜,这让很多公司感到担忧,尤其是行业领袖常常成为攻击目标。为了保护资产,加密技术是一种有效对抗网络威胁的方法,它将数据转化为代码并使用加密信息,只有收件人能够解码。通过加密来保护数据传输,提高数字传输的准确性和安全性。仓库存储大数据似乎难以管理,就像一个无尽的数据漩涡。将数据集中到一个公司位置似乎明智,类似于一个仓库,但一些报告提出反对意见,认为即使在最大的存储中心,也无法应对大数据指数级增长。备份服务当然,大数据管理和存储正迅速摆脱物理机器,进入数字领域。随着技术不断进步,大数据增长迅猛,已经到达无法完全容纳在所有机器和仓库中的程度。大数据存储及管理

数据存储管理这块分为两个部分,一部分是底层的文件系统,还有一部分就是之上的数据库或数据仓库。常用工具如下:存储管理工具介绍1文件系统大数据文件系统其实是大数据平台架构最为基础的组件,其他的组件或多或少都会依赖这个基础组件,目前应用最为广泛的大数据存储文件系统非Hadoop的HDFS莫属,除此之外,还有发展势头不错的Ceph。2数据库或数据仓库针对大数据的数据库大部分是NOSQL数据库,这里顺便澄清一下,NOSQL的真正意义是“notonlysql”,并非NOSQL是RMDB的对立面。常用的数据库或数据仓库有HBase、MongoDB、Cassandra、Neo4j等。大数据分析及挖掘概述数据分析是对大量数据进行详细研究、提取有用信息和形成结论的过程,支持质量管理体系,帮助做出适当决策。它结合了数学和计算机科学,早在20世纪初就确立了数学基础,但直到计算机出现才得以广泛应用。数据挖掘是跨学科的计算机科学分支,利用人工智能、机器学习、统计学和数据库等方法,在大型数据集中发现模式的计算过程。其总体目标是从数据集中提取信息并转化为可理解的结构,以便进一步应用。数据挖掘包括数据预处理、模型构建、兴趣度度量、复杂性考虑、可视化等步骤,属于机器学习的一部分,也是数据库知识发现(KDD)的一部分。区别:数据挖掘是通过人工智能、机器学习、统计学和数据库方法在大型数据集中发现知识规则的计算过程。数据分析包括检查、清理、转换和建模等过程,是人的智能活动的结果,旨在发现有用信息、提出建设性结论并辅助决策。在实际应用中,两者应该互相结合,根据具体业务需求选择适当的思路和算法,最终综合考虑效果和资源匹配等因素来确定最佳解决方案。大数据分析及挖掘常用方法常用大数据分析挖掘方法简介1神经网络方法神经网络因其鲁棒性、自适应性、并行处理、分布存储和高容错性等特点,成为解决数据挖掘问题的理想选择,近年来备受关注。2遗传算法遗传算法是一种仿生全局优化方法,基于自然选择和遗传机理,具有隐含并行性和易于与其他模型结合的特性,因此在数据挖掘中广泛应用。3决策树方法决策树是一种常用于预测模型的算法,通过有目的地分类数据来提取有价值的信息。它简单易懂,处理大规模数据速度快。4粗集方法

粗集理论是处理不确定知识的数学工具。它具有几个优点:不需额外信息、简化信息表达、操作简单。粗集通常用于处理类似二维关系表的数据。5覆盖正例排斥反例方法利用覆盖所有正例、排斥所有反例的思想来找规则。首先选择一个正例种子,逐个与反例比较,与字段取值相符则保留,否则舍去。如此循环所有正例种子,得到正例的规则(选择子的合取式)。大数据分析及挖掘常用工具常用大数据分析挖掘工具简介1HadoopHadoop是一个分布式处理大数据的软件框架,以可靠、高效、可伸缩为特点。它可靠,因为它考虑了计算和存储的故障,保持多个数据副本以应对失败。它高效,通过并行处理提高速度。它可伸缩,适用于PB级数据。Hadoop基于社区服务器,成本较低,对所有人都开放。2SparkSpark是Apache基金会的开源项目,由加州大学伯克利分校实验室开发,是一种重要的分布式计算系统。与Hadoop不同,Spark使用内存存储数据,因此速度可达Hadoop的100倍以上。但由于内存数据会丢失,不适用于长期保存的数据处理。Spark已经将大部分数据挖掘算法从单机迁移到分布式,提供了方便的数据分析可视化界面。3StormStorm是Twitter推广的分布式计算系统,由BackType团队开发,是Apache基金会的项目。它在Hadoop基础上提供了实时计算能力,可实时处理大数据流。与Hadoop和Spark不同,Storm不收集或存储数据,而是直接通过网络实时接收、处理和传输数据及结果。大数据展现及应用应用商业智能政府决策公共服务大数据重点应用三大领域大数据展现及应用大数据检索大数据检索是大数据展现及应用中的重要一环,因为数据集很大很复杂,所以它们需要特别涉及的硬件和软件工具。大数据检索工具简介1ApacheDrillDrill是一个低延迟的分布式数据查询引擎,支持大规模数据(包括结构化、半结构化和嵌套数据)。它使用ANSISQL语法,可以连接本地文件、HDFS、HBase、MongoDB等后端存储,同时支持多种数据格式,如Parquet、JSON、CSV等。2PrestoFaceBook在2013年11月开源了Presto,一个专注于高速、实时数据分析的分布式SQL查询引擎。它支持标准的ANSISQL,包括复杂查询、聚合、连接和窗口函数。Presto设计了一个简单的数据存储抽象层,使得可以在不同的数据存储系统上(包括HBase、HDFS、Scribe等)使用SQL进行查询。3ApacheKylin这是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBayInc.开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。大数据展现及应用大数据可视化大数据可视化是根据数据的特性,如时间和空间信息等,采用图表、图形和地图等可视化方式,将数据直观呈现,帮助人们理解数据并发现其中的规律和信息。Jupyter:大数据可视化的一站式商店GoogleChart:Google支持的免费而强大的整合功能D3.js:以任何你需要的方式直观的显示大数据大数据展现及应用大数据应用我国大数据市场产值图(单位:亿元)医疗大数据生物大数据金融大数据零售大数据电商大数据农牧大数据交通大数据政府调控和财政支出大数据展现及应用大数据安全大数据安全体系个人隐私保护数据安全大数据平台安全提纲9.1 大数据背景与概述9.2 大数据处理关键技术9.3 大数据计算模式9.4基于Hadoop的大数据编程实践9.5基于Spark的大数据编程实践MapReduce介绍MapReduce的运行模型MapReduce是一种面向大数据的批处理计算模式,由Google提出,适用于处理大规模数据集。它借用了函数式编程理念中的"Map"和"Reduce"概念。在该模型中,用户需要定义一个"Map"函数,将一组键值对映射为新的键值对,同时指定一个并发的"Reduce"函数,用于归纳所有映射结果中共享相同键的数据。MapReduce实现原理MapReduce执行流程MapReduce实现原理1、用户程序中的MapReduce库首先将输入文件分割成M个片段,每个片段通常大小在16到64MB之间(可由用户通过可选参数控制),然后在集群中启动大规模的数据拷贝操作。MapReduce实现原理2、这些程序拷贝中的一个是master,其他的都是由master分配任务的worker。有M个map任务和R个reduce任务将被分配。master分配一个map任务或reduce任务给一个空闲的worker。MapReduce实现原理3、一个被分配了map任务的worker读取相关输入split的内容。它从输入数据中分析出key/value对,然后把key/value对传递给用户自定义的map函数。由map函数产生的中间key/value对被缓存在内存中。MapReduce实现原理4、缓存在内存中的key/value对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域。在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduceworker。MapReduce实现原理5、reduceworker收到master位置通知后,通过远程过程调用从mapworker的磁盘读取缓存数据。读取所有中间数据后,它会对具有相同key的数据进行排序和聚合,因为多个不同的key可能映射到同一个reduce任务,所以排序是必要的。如果中间数据超过内存大小,将需要进行外部排序。MapReduce实现原理6、reduceworker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数。reduce函数的输出被添加到这个reduce分割的最终的输出文件中。MapReduce优势劣势1、移动计算而不是移动数据,避免了额外的网络负载;2、任务相互独立,易于处理局部故障,单个节点故障只需重启节点任务。防止故障扩散到整个集群,允许处理同步中的错误。备份任务可提升拖延任务的执行速度;3、MapReduce模型是可线性扩展的;4、MapReduce模型结构简单,用户只需要至少编写Map和Reduce函数;5、相对于其他分布式模型,MapReduce的一大特点是其平坦的集群扩展代价曲线。在大规模集群时,MapReduce表现非常好。1、MapReduce模型缺乏一个中心用于同步各个任务;2、由于MapReduce模型是没有索引结构,用MapReduce模型来实现常见的数据库连接操作非常麻烦且效率低下;3、MapReduce集群管理比较麻烦,在集群中调试、部署以及日志收集工作都很困难;4、单个Master节点有单点故障的可能性且可能会限制集群的扩展性;5、当中间结果必须给保留的时候,作业的管理并不简单;6、对于集群的参数配置的最优解并非显然,许多参数需要有丰富的应用经验才能确定。Spark介绍Spark由加州大学伯克利分校AMP实验室开发,可用来构建大型的、低延迟的数据分析应用程序,是一种面向大数据处理的分布式内存计算模式或框架。Spark生态环境Spark总体架构Spark总体架构DriverProgram:运行main函数并且新建SparkContext的程序;SparkContext:Spark程序的入口,负责调度各个运算资源,协调各个WorkerNode上的Executor;Application:基于Spark的用户程序,包含了driver程序和集群上的executor;ClusterManager:集群的资源管理器(例如:Standalone,Mesos,Yarn);Spark总体架构Spark总体架构WorkerNode:集群中任何可以运行应用代码的节点;Executor:是在一个WorkerNode上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的Executors;Task:被送到某个Executor上的工作单元Spark总体架构Spark运行状态Q:一个用户程序是如何从提交到最终到集群上执行的呢?Spark弹性分布数据集RDDRDD(ResilientDistributedDataset),作为Spark的核心数据结构,是一个基于分区的、只读的数据记录集抽象。它是逻辑集中的实体,但在集群中的多台机器上进行了分区。通过对多台机器上不同RDD联合分区的控制,可以减少机器之间数据混合(DataShuffling)。RDD可缓存在RAM中,提供更快的数据访问。目前只支持整个RDD级别的缓存,当集群内存不足时可以根据LRU算法替换RDD。Spark弹性分布数据集RDDRDD提供抽象数据架构,隐藏底层分布性,应用逻辑通过Transformation和Action来表达一系列转换处理,前者在RDD之间指定处理的相互依赖关系有向无环图DAG,后者指定输出的形式。调度程序通过拓扑排序确定DAG执行顺序,追踪最源头的节点或者代表缓存RDD的节点。用户通过Transformation和Action操作控制RDD转换和输出。Transformation延迟执行,根据Action生成各代RDD,最终生成输出。用户通过选择Transformation的类型并定义Transformation中的函数来控制RDD之间的转换关系。当用户调用不同类型的Action操作来把任务以自己需要的形式输出。Transformation在定义时并没有立刻被执行,而是等到第一个Action操作到来时,在根据Transformation生成各代RDD.最后由RDD生成最后的输出。SparkRDD依赖的类型在RDD依赖关系有向无环图中,RDD之间的关系由Transformation来确定,根据Transformation的类型,生成的依赖关系有两种形式:宽依赖与窄依赖。窄依赖和宽依赖SparkRDD依赖的类型–窄依赖窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。窄依赖的RDD可以通过相同的键进行联合分区,整个操作都可以在一台机器上进行,不会造成网络之间的数据混合。窄依赖SparkRDD依赖的类型–宽依赖宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖的RDD就会涉及到数据混合。调度程序会检查依赖性的类型,将窄依赖的RDD划到一组处理当中,即stage。宽依赖在一个执行中会跨越连续的stage,同时需要显式指定多个子RDD的分区。宽依赖SparkSpark的任务生成模式Spark任务生成模式RDD分区RDD之间关系1、用户提交的计算任务是由RDD构成的DAG;2、若RDD转换是宽依赖,这个宽依赖转换就会将这个DAG分为了不同阶段的stage,不同的stage不可以进行并行计算;3、运行时,Spark会把任务集合提交给任务调度器处理;4、RDD之间是窄依赖的,都归到一个stage里。Spark分析:Spark迭代性能远超Hadoop的原因是什么?Spark与Hadoop迭代过程比较流式计算流式数据流式大数据是随着时间而无限增加的数据序列,简称为流数据。流式大数据数据量大时效性高数据源不单一有序处理流式大数据特征流式计算流式计算系统流式计算是对流式数据进行实时分析计算的一种技术。它能很好地满足流数据处理的实时性和可靠性的要求。目前,具体代表性的大数据流式计算系统架构主要有两类,一类是有中心的主从式架构,一类是去中心化的对等式架构。主从式架构对等式架构金融银行业互联网物联网应用场景流式计算典型流式计算系统–SparkStreamingSparkStreaming处理流程SparkStreaming是在Spark基础上扩展的实时计算框架,能够实现高吞吐量的、容错处理的流式数据处理。其中,SparkStreaming中将流数据分为许多微批数据的引擎为SparkCore,它将流数据分为许多段微小的数据,再将这些数据转换成RDD,利用Spark系统的SparkEngine对RDD进行Transformation处理,将结果保存在内存中。SparkRDD容错性微批数据SparkEngine实时性流式计算典型流式计算系统–Storm系统Strom系统是由Twitter支持开发的一个分布式、实时的高容错开源流式计算系统,侧重于低延迟。与微批处理不同,Storm系统直接采用原生数据处理,成本较大。Storm系统拓扑Storm系统计算的作业逻辑单元是一个叫作Thrift的拓扑结构,由以下组件构成:Spout组件拓扑的起始单元,从外部读取原生数据流;Bolt组件拓扑的处理单元,对接收来的Tuples元组进行过滤、聚合、连接等处理,以流形式输出。流式计算典型流式计算系统–Storm系统Storm系统架构主从式架构设计由一个主节点nimbus、多个从节点supervisor和Zookeeper集群组成主节点和从节点由Zookeeper进行协调流式计算典型流式计算系统–Storm系统Storm系统数据交互将原生数据流处理成拓扑,提交给主节点Nimbus主节点nimbus从zookeeper集群中获得心跳信息,根据系统情况分配资源和任务给从节点Supervisor执行从节点监听到任务后启动或关闭Worker进程执行任务;Worker执行任务,把相关信息发送给Zookeeper集群存储。优势:不足:单数据流处理,延时极低单数据流丢失难以维护,不适合逻辑复杂、容错性要求高的工作流式计算典型流式计算系统–S4系统S4系统(SimpleScalableStreamingSystem)是雅虎用Java语言开发的通用、分布式、低延时、可扩展、可拔插的大数据流式计算系统,它采用的也是原生流数据处理。S4系统任务拓扑S4系统的基本计算单元:函数表示PE的功能与配置;事件类型表示PE接收的事件类型;主键键值定义每个PE只处理事件类型、主键、键值都匹配的事件。不匹配则创建新的处理单元。(K,A)流式计算典型流式计算系统–S4系统S4系统架构客户端驱动TCP/IP协议栈对等式架构用户服务请求流式计算典型流式计算系统–Kafka系统Kafka系统架构Kafka系统是由Linkedin支持开发的分布式、高吞吐量、开源的发布订阅消息系统,能够有效处理活跃的流式数据,侧重于系统吞吐量。消息发布者Producer缓存代理Broker订阅者Consumermessage状态管理、负载均衡流式计算典型流式计算系统–Kafka系统根据消息源的类型将其分为不同的主题topic,每个topic包含一个或多个partition消息发布者按照指定的partition方法,给每个消息绑定一个键值,保证将消息推送到相应的topic的partition中,每个partition代表一个有序的消息队列缓存代理将消息持久化到磁盘,设置消息的保留时间,系统仅存储未读消息。订阅者订阅了某一个主题topic,则从缓存代理中拉取该主题的所有具有相同键值的消息。Kafka系统消息处理流程优势:不足:可扩展性低延时性可快速处理大量流数据,适合吞吐量高的工作负载仅支持部分容错代理缓存没有副本节点流式计算典型流式计算系统--总结性能指标SparkStreamingStorm系统S4系统Kafka系统系统架构主从式架构主从式架构对等式架构主从式架构开发语言JavaClojure,JavaJavaScala数据传输方式拉取拉取推送推送拉取容错机制作业级容错作业级容错部分容错部分容错负载均衡支持不支持不支持部分支持资源利用率高高低低状态持久化支持不支持支持不支持编程模型纯编程纯编程编程+XML纯编程提纲9.1 大数据背景与概述9.2 大数据处理关键技术9.3 大数据计算模式9.4基于Hadoop的大数据编程实践9.5基于Spark的大数据编程实践Hadoop环境的搭建单机伪分布环境搭建环境要求:Linux操作系统Centos7发行版,Java环境(1.8版本的JDK)。第一步:下载Hadoop压缩包并解压到任意目录,由于权限问题建议解压到当前用户的主目录(home)。(下载地址:/apache/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz)。Hadoop环境的搭建单机伪分布环境搭建环境要求:Linux操作系统Centos7发行版,Java环境(1.8版本的JDK)。第二步:修改Hadoop的配置文件:etc/hadoop/hadoop-env.sh、etc/hadoop/hdfs-site.xml、etc/hadoop/core-site.xml。(如果只是部署HDFS环境只需要修改这三个文件,如需配置MapReduce环境请参考相关文档)#conf/core-site.xml修改如下:<configuration><property><name></name><value>HDFS://localhost:9000</value></property></configuration>#conf/HDFS-site.xml修改如下(这里只设置了副本数为1):<configuration><property><name>dfs.replication</name><value>1</value></property></configuration>#etc/hadoop/Hadoop-env.sh中修改了JAVA_HOME的值exportJAVA_HOME=/home/Hadoop/jdkHadoop环境的搭建单机伪分布环境搭建环境要求:Linux操作系统Centos7发行版,Java环境(1.8版本的JDK)。第三步:配置ssh自动免密码登录。1、运行ssh-keygen命令并一路回车使用默认设置,产生一对ssh密钥。2、执行ssh-copy-id-i~/.ssh/id_rsa.publocalhost把刚刚产生的公钥加入到当前主机的信任密钥中,这样当前使用的用户就可以使用ssh无密码登录到当前主机。第四步:第一次启动HDFS集群时需要格式化HDFS,在master主机上执行hadoop

namenode-format进行格式化。如果格式化成功后,则在Hadoop所在的目录执行sbin/start-dfs.sh开启HDFS服务。查看HFDS是否正确运行可以执行jps命令进行查询。Hadoop环境的搭建多节点全分布搭建本实例中,每个节点都需要使用固定IP并保持相同的Hadoop配置文件,每个节点Hadoop和jdk所在路径都相同、存在相同的用户且配置好免密码登录。第一步:与单机伪分布模式相同,下载Hadoop的二进制包,并解压备用。1、etc/hadoop/hadoop-env.sh中修改变了JAVA_HOME的值为JDK所在路径。2、etc/hadoop/core-site.xml修改如下:第二步:修改Hadoop配置文件,与伪分布有些许不同。Hadoop环境的搭建多节点全分布搭建3、etc/hadoop/hdfs-site.xml修改如下:(其中.dir

和dfs.data.dir可以任意指定,注意权限问题)第二步:修改Hadoop配置文件,与伪分布有些许不同。Hadoop环境的搭建多节点全分布搭建4、先执行mvmapred-site.xml.templatemapred-site.xml,然后修改mapred-site.xml如下第二步:修改Hadoop配置文件,与伪分布有些许不同。Hadoop环境的搭建多节点全分布搭建第二步:修改Hadoop配置文件,与伪分布有些许不同。5、etc/hadoop/yarn-site.xml修改如下:6、etc/hadoop/masters(需要自己新建文件)里添加secondarynamenode

主机名,比如任意slave的主机名。7、etc/hadoop/slaves里面添加各个slave的主机名,每行一个主机名。Hadoop环境的搭建多节点全分布搭建第三步:配置hosts文件或做好dns解析。以修改hosts文件为例子,在/etc/hosts里面添加所有主机的IP以及主机名。每个节点都使用相同的hosts文件。第四步:配置ssh自动登录,确保master主机能够使用当前用户免密码登录到各个slave主机上。在master上执行ssh-keygen命令,并使用以下命令将master的公钥添加到全部节点的信任列表上。ssh-copy-id-i~/.ssh/id_rsa.pubmasterssh-copy-id-i~/.ssh/id_rsa.pubslave1ssh-copy-id-i~/.ssh/id_rsa.pubslave2Hadoop环境的搭建多节点全分布搭建第五步:第一次启动HDFS集群时需要格式化HDFS,在master主机上执行hadoopnamenode-format,这一操作和伪分布相同。启动HDFS集群,在master主机上的Hadoop所在目录运行sbin/start-dfs.sh启动dfs。运行sbin/start-yarn.sh启动yarn。运行jps可检查各个节点是否顺利启动,具体显示如下所示。基于MAPREDUCE程序实例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0组成的环境。在idea中新建maven工程,命名为bigdata。基于MAPREDUCE程序实例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0组成的环境。2.添加maven依赖,添加如下dependency<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-core</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-jobclient</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version></dependency>续基于MAPREDUCE程序实例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0组成的环境。3.编写wordcount程序在/src/main/java下新建Mapreduce包,包内新建WordCount类,在WordCount.java下编写源代码。4.将maven工程打包在右侧maven工具栏中选择Lifecycle/package,点击Runmavenbuild:打包完成后,在项目的target文件夹中找到打包好的bigdata-1.0-SNAPSHOT.jar,将其重命名为WordCount.jar基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析1.源代码程序public

class

WordCount{

public

static

class

TokenizerMapper

extends

Mapper<Object,

Text,

Text,

IntWritable>{

private

final

static

IntWritable

one

=

new

IntWritable(1);

private

Text

word

=

new

Text();

public

void

map(Object

key,

Text

value,

Context

context)

throws

IOException,

InterruptedException{

StringTokenizer

itr

=

new

StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

this.word.set(itr.nextToken());

context.write(this.word,one);

}

}

}

public

static

class

IntSumReducer

extends

Reducer<Text,IntWritable,Text,IntWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,Context

context)

throws

IOException,

InterruptedException{

int

sum

=

0;

for(Iterator

i

=

values.iterator();i.hasNext();sum+=

val.get()){

val=(IntWritable)i.next();

}

this.result.set(sum);

context.write(key,

this.result);

}

}基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析1.源代码程序

public

static

void

main(String[]args)throws

IOException,

ClassNotFoundException,

InterruptedException{

Configuration

conf

=

new

Configuration();

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job

job

=

Job.getInstance(conf,

"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(WordCount.TokenizerMapper.class);

job.setCombinerClass(WordCount.IntSumReducer.class);

job.setReducerClass(WordCount.IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,

new

Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?

0

:

1);

}

}基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析1.Map过程public

static

class

TokenizerMapperextends

Mapper<Object,

Text,

Text,

IntWritable>{

private

final

static

IntWritable

one

=

new

IntWritable(1);

private

Text

word

=

new

Text();

public

void

map(Object

key,

Text

value,

Context

context)

throws

IOException,

InterruptedException{

StringTokenizer

itr

=

new

StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

this.word.set(itr.nextToken());

context.write(this.word,one);

}

}}基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析2.Reduce过程public

static

class

IntSumReducerextends

Reducer<Text,IntWritable,Text,IntWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,Context

context)

throws

IOException,

InterruptedException{

int

sum

=

0;

for(Iterator

i

=

values.iterator();i.hasNext();sum+=

val.get()){

val=(IntWritable)i.next();

}

this.result.set(sum);

context.write(key,

this.result);

}}基于MAPREDUCE程序实例(HDFS)基于新API的WordCount分析3.执行MapReduce任务public

static

void

main(String[]args)throwsException{

Configuration

conf

=

new

Configuration();

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job

job

=

new

Job(conf,

"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(WordCount.TokenizerMapper.class);

job.setCombinerClass(WordCount.IntSumReducer.class);

job.setReducerClass(WordCount.IntSumReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,

new

Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?

0

:

1);}基于MAPREDUCE程序实例(HBase)1.添加Maven依赖新增dependency依赖如下:<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-shaded-client</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-common</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-client</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-mapreduce</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-server</artifactId>

<version>2.2.4</version></dependency>

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-endpoint</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-metrics-api</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-thrift</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-rest</artifactId>

<version>2.2.4</version></dependency>注意:HBase的lib目录下的Hadoop-core文件版本需要与Hadoop的版本对应,不然会出现无法连接的情况。基于MAPREDUCE程序实例(HBase)1.基于Hbase的WordCount实例程序1public

static

class

IntSumReducer

extendsTableReducer

<Text,IntWritable,ImmutableBytesWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,

Context

context)throws

IOException,

InterruptedException{

int

sum

=

0;

for(IntWritable

val

:values){

sum+=

val.get();

}

result.set(sum);

Put

put

=

new

Put(key.getBytes());//put实例化,每一个词存一行

//列族为content,列修饰符为count,列值为数目

put.addColumn(Bytes.toBytes("content"),Bytes.toBytes("count"),

Bytes.toBytes(String.valueOf(sum)));

context.write(new

ImmutableBytesWritable(key.getBytes()),put);

}}基于MAPREDUCE程序实例(HBase)1.基于Hbase的WordCount实例程序1public

static

void

main(String[]args)throwsException{

TableName

tablename

=

TableName.valueOf("wordcount");

//实例化Configuration,注意不能用newHBaseConfiguration()了。

Configuration

conf

=

HBaseConfiguration.create();

Connection

conn

=

ConnectionFactory.createConnection(conf);

Admin

admin

=

conn.getAdmin();

if(admin.tableExists(tablename)){

System.out.println("tableexists!recreating...");

admin.disableTable(tablename);

admin.deleteTable(tablename);

}

TableDescriptorBuilder

tdb

=

TableDescriptorBuilder.newBuilder(tablename);

HTableDescriptor

htd

=

new

HTableDescriptor(tablename);

HColumnDescriptor

hcd

=

new

HColumnDescriptor("content");

tdb.addFamily(hcd);

//创建列族

admin.createTable(tdb.build());//创建表

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

1){

System.err.println("Usage:wordcount<in><out>"+otherArgs.length);

System.exit(2);

}基于MAPREDUCE程序实例(HBase)1.基于Hbase的WordCount实例程序1

Job

job

=

Job.getInstance(conf,

"wordcount");

job.setJarByClass(WordCountHBase.class);

job.setMapperClass(TokenizerMapper.class);

//job.setCombinerClass(IntSumReducer.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

//此处的TableMapReduceUtil注意要用Hadoop.HBase.MapReduce包中的,而不是Hadoop.HBase.mapred包中的

TableMapReduceUtil.initTableReducerJob(tablename,

IntSumReducer.class,job);

//key和value到类型设定最好放在initTableReducerJob函数后面,否则会报错

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?

0

:

1);}注意:此处的TableMapReduceUtil是Hadoop.HBase.MapReduce包中的,而不是Hadoop.HBase.mapred包中的,否则会报错。基于MAPREDUCE程序实例(HBase)2.基于Hbase的WordCount实例程序2public

static

class

TokenizerMapper

extends

TableMapper<Text,

Text>{

public

void

map(ImmutableBytesWritable

row,

Result

values,

Context

context)throws

IOException,

InterruptedException{

StringBuffer

sb

=

new

StringBuffer("");

for(java.util.Map.Entry<byte[],byte[]>value

:

values.getFamilyMap("content".getBytes()).entrySet()){

//将字节数组转换成String类型,需要newString();

String

str

=

new

String(value.getValue());

if(str!=

null){

sb.append(new

String(value.getKey()));

sb.append(":");

sb.append(str);

}

context.write(new

Text(row.get()),

new

Text(new

String(sb)));

}

}}map函数继承到TableMapper接口,从result中读取查询结果。基于MAPREDUCE程序实例(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论