大数据知识培训完整版_第1页
大数据知识培训完整版_第2页
大数据知识培训完整版_第3页
大数据知识培训完整版_第4页
大数据知识培训完整版_第5页
已阅读5页,还剩138页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据培训大数据概念Hadoop介绍HDFSMAPREDUCE编程HIVEFLUMESTORMZOOKEEPER培训内容大数据定义大小超过常规的数据库工具获取、存储、管理和分析能力的数据集事务所涉及的资料量规模巨大到无法通过目前主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。四大特征:大量(Volume)存储大;计算量大;多样(Variety)来源多;格式多;快速(Velocity)

增长速度快处理速度要求快价值(Value)

浪里淘沙却又弥足珍贵大数据概念大数据趋势StructuredDatabaseSpreadsheetFileinrecordformatSemi-structuredXMLDocsLogsClick-streamEquipment/Device,RFIDtagUnstructuredWebPagesE-mailMultimediaInstantMessagesDocumentsBigDataPeopleDevicesSensors移动互联网Mobile

Internet物联网InternetofThings新量级、新处理模式、新企业智能大数据趋势对企业数据处理的挑战对企业数据处理的挑战每天几百GB、几TB的资料,且持续成长中储存Storing

在收数据的同时做必要的前置处理(pre-processing),并区分数据处理的优先等级(prioritizing)计算Processing如何有效的避免因硬件毁坏所导致的资料损毁管理Managing如何从中挖掘出所关注事件的pattern或behavior分析Analyzing5典型互联网大数据应用技术体系典型互联网大数据应用技术体系数据存储、计算规则、业务处理数据采集数据服务高效、智能的数据采集技术高效数据仓库技术(ETL)实时计算技术大数据存储技术大数据计算技术机器学习关键技术数据分析&可视化技术数据共享技术大数据生态圈组件用途说明MapReduce并行计算框架分布式数据处理模型和执行环境HDFS存储分布式文件系统HIVE数据仓库一个分布式、按列存储的数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言(由运行时引擎翻译成MapReduce作业)用以查询数据。sqoop数据抽取Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL,Oracle,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。Spark并行计算框架Spark是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。使用的语言是Scala。hbase存储数据库

一个分布式、按列存储数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询(随机读取)Pig数据流语言一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫PigLatin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口。ZooKeeper分布式协调服务ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务Storm实时计算框架一个实时计算框架,与MapReduce离线计算框架互补,分别用于解决不同场景下的问题mahout机器学习库-提供一些可扩展的机器学习领域经典算法的实现AmbariHadoop集群监控ApacheAmbari是一个供应、管理和监视ApacheHadoop集群的开源框架,它提供一个直观的操作工具和一个健壮的HadoopAPI,可以隐藏复杂的Hadoop操作,使集群操作大大简化。Hadoop介绍Hadoop概况

Hadoop概况Apache开源项目–源于Lucene项目的一部分,2006.1成为子项目,现为Apache顶级项目之一–Google的三篇论文MapReduce,GFS,BigTable–Yahoo!是最主要的源代码贡献者,其他贡献者:Powerset,Facebook等–已知为接近150家的大型组织实际使用:Yahoo!,Amazon,EBay,AOL,Google,IBM,Facebook,Twitter,Baidu,Alibaba,Tencent,…(/hadoop/PoweredBy)Hadoop核心功能–高可靠性,高效率的分布式文件系统–一个海量数据处理的编程框架Hadoop特点–可扩展性:Petabytes(1015Bytes)级别的数据量,数千个节点–经济性:利用商品级(commodity)硬件完成海量数据存储和计-高效率:通过分发数据,可以在数据所在的节点上并行处理,使得处理效率变快–可靠性:在大规模集群上提供应用级别的可靠性HadoopVS.RDB关系数据库Hadoop资料量GB->TBTB->PB存取方式交互式与批次批次数据更新多次读写一次写,多次读数据结构固定schema无schema资料一致性高(ACID)低扩充性非线性线性Hadoop特性Hadoop特性传统并行计算架构并行计算

+

分布式存储运算储存传统储存架构计算与存储一体,计算向数据靠拢,高效专用存储模式为程序员屏蔽通性、并发、同步与一致性等问题任务之间无依赖(share-nothing),具有高系统延展性(scale-out)HDFSHadoop1.XHDFS架构图NameSpaceStateBlockMapSecondaryNameNodeClientDataNodeDataNodeDataNodeDataNodeNameNodeHeartbeat&Blockreport本地磁盘元数据HDFS设计原则文件以块(block)方式存储每个块带下远比多数文件系统来的大通过副本机制提高可靠度和读取吞吐量每个区块缺省分到三台DataNode上

master(NameNode)来协调存储元数据(metadata)客户端对文件没有缓存机制(Nodatacaching)NameNode(NN)NameNode主要功能提供名称查询服务NameNode保存metadate信息包括文件owership和permissions文件包含哪些块Block保存在哪个DataNode(由DataNode启动时上报)NameNode的metadate信息在启动后会加载到内存metadata存储到磁盘文件名为”fsimage” Block的位置信息不会保存到fsimageNameNode

块存储结构metadata物理存储结构DataNode(DN)保存Block启动DN线程的时候会向NN汇报block信息通过向NN发送心跳保持与其联系(3秒一次),如果NN10分钟没有收到DN的心跳,则认为其已经lost,并copy其上的block到其它DNBlock的副本放置策略第一个副本:放置在上传文件的DN;如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点第二个副本:放置在于第一个副本不同的机架的节点上第三个副本:第二个在同一个机架,随机放在不同的node中。更多副本:随机节点再说Block设置一个Block64MB,如果上传文件小于该值,仍然会占用一个Block的命名空间(NameNodemetadata),但是物理存储上不会占用64MB的空间Block大小和副本数由Client端上传文件到HDFS时设置,其中副本数可以变更,Block是不可以再上传后变更的数据损坏(corruption)处理当DN读取block的时候,它会计算checksum如果计算后的checksum,与block创建时值不一样,说明该block已经损坏。client读取其它DN上的block;NN标记该块已经损坏,然后复制block达到预期设置的文件备份数DN在其文件创建后三周验证其checksumHDFS文件权限21与Linux文件权限类似r:read;w:write;x:execute,权限x对于文件忽略,对于文件夹表示是否允许访问其内容如果Linux系统用户test使用hadoop命令创建一个文件,那么这个文件在HDFS中owner就是testHDFS的权限目的:阻止好人错错事,而不是阻止坏人做坏事。HDFS相信,你告诉我你是谁,我就认为你是谁ACL权限控制setfacl-mu:cheng:wrtest.txtyarm

rmadmin-refreshServiceAclhadoop

dfsadmin-refreshServiceAclHDFS文件写入HDFS文件读取MapReduceMap/Reduce

Map/Reduce•什么是Map/Reduce–一种高效,海量的分布式计算编程模型海量:相比于MPI,Map处理之间的独立性使得整个系统的可靠性大为提高.高效:用调度计算代替调度数据!分布式操作和容错机制由系统实现,应用级编程非常简单.•计算流程非常类似于简单的Unixpipe:–Pipe:catinput|grep|sort|uniq-c>output–M/R:Input|map|shuffle&sort|reduce|output•多样的编程接口:–Javanativemap/reduce–可以操作M/R各细节–Streaming–利用标准输入输出模拟以上pipeline–Pig–只关注数据逻辑,无须考虑M/R实现MapReduce编程接口1)迭代(iteration)。遍历输入数据,并将之解析成key/value对。2)将输入key/value对映射(map)成另外一些key/value对。3)依据key对中间数据进行分组(grouping)。4)以组为单位对数据进行归约(reduce)。5)迭代。将最终产生的key/value对保存到输出文件中。MapReduce编程模型用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所有这些中间的有着相同key的values合并起来。Map/reduce计算流程

用Java进行Map/Reduce编程

MapReduce单一Reduce处理多个Reduce处理无Reduce处理MapReduceMapReduce物理上MapReduceMapReduce处理流程MapReduceMapReduceMapReduceMapReduceMapReduceMapReduceMapReduceMapReduceMapReduceMapReduceMapReduceJobTracker一直在等待JobClient提交作业TaskTracker每隔3秒向JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行这是一道pull过程:slave主动向master拉生意MapReduce特点:Fault-tolerant容错,很重要!M/RFailuresTaskfails Tryagain? Tryagainsomewhereelse? Reportfailure只有当map处理全部结束后,reduce过程才能够开始Hadoop开发注意点1.Hadoop将运行一次Map/Reduce作业叫做运行一个Job2.Hadoop需要计算的源数据都存储在HDFS中。3.Map阶段计算结果存储在本地文件系统中。4.Hadoop最终计算的结果也存储在HDFS中。5.Map/Reduce框架的运作完全基于<key,value>对,即数据的输入是一批<key,value>对,生成的结果也是一批<key,value>对。6.默认情况下,Key与value之间用\t分隔。WordCount:MapReduce版"HelloWorld",WordCount代码解析--MappublicstaticclassMapextendsMapReduceBaseimplements

Mapper<LongWritable,Text,Text,IntWritable>{

privatefinalstaticIntWritableone=newIntWritable(1);

privateTextword=newText();

//map函数publicvoidmap(LongWritablekey,Textvalue,

OutputCollector<Text,IntWritable>output,Reporterreporter)

throwsIOException{

Stringline=value.toString();

StringTokenizertokenizer=newStringTokenizer(line);

while(tokenizer.hasMoreTokens()){

word.set(tokenizer.nextToken());

output.collect(word,one);

}

}

}WordCount代码解析--ReducepublicstaticclassReduceextendsMapReduceBaseimplementsReducer<Text,IntWritable,Text,IntWritable>{

//reduce函数publicvoidreduce(Textkey,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporterreporter)throwsIOException{intsum=0;while(values.hasNext()){sum+=values.next().get();}output.collect(key,newIntWritable(sum));}}WordCount代码解析—Main主函数publicstaticvoidmain(String[]args)throwsException{JobConfconf=newJobConf(WordCount.class);conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(Map.class);conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(conf,newPath(args[0]));//hdfs文件路径FileOutputFormat.setOutputPath(conf,newPath(args[1]));//结果输出路径JobClient.runJob(conf);}Barrier(good,1)(good,1)(good,2)(good,1)PartitionerPartitionerPartitionerPartitioner(is,1)(is,1)(is,1)(has,1)(weather,1)(weather,1)(weather,1)(the,1)(today,1)(today,1)海量数据存储计算结果……数据划分Map初始kv键值对初始kv键值对初始kv键值对初始kv键值对MapMapMap中间结果(the,1)(weather,1)(is,1)(good,1)CombinerCombinerCombinerCombiner(the,1)(weather,1)(is,1)(good,1)(today,1)(is,1)(good,1)(good,1)(weather,1)(is,1)(good,1)(today,1)(has,1)(good,1)(weather,1)(today,1)(is,1)(good,1)(good,2)(weather,1)(is,1)(today,1)(has,1)(good,1)(weather,1)ReduceReduceReduce(good,5)(is,3)(has,1)(weather,3)(the,1)(today,2)Combiner和Partitioner基于Map和Reduce的并行计算模型InjectPointsInputjob.setInputFormatClass()SplitRecordReaderMapjob.setMapperClass()Combine*job.setCombinerClass()Shufflingjob.setPartitionerClass()Sortjob.setSortComparatorClass()Groupingjob.setGroupingComparatorClass()Reducejob.setReducerClass()Outputjob.setOutputFormatClass()

Combiners和Partitioner编程Combiners的作用:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量,

1)combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:

map:(K1,V1)→list(K2,V2)

combine:(K2,list(V2))→list(K2,V2)

reduce:(K2,list(V2))→list(K3,V3)2)combiner还具有类似本地的reduce功能.

例如hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致。如下所示:

map:(K1,V1)→list(K2,V2)

combine:(K2,list(V2))→list(K3,V3)

reduce:(K3,list(V3))→list(K4,V4)

3)如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

4)对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。注意:combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确。Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

Combiner

Combiners分析假设有两个map。第一个map的输出为:(1950,0)

(1950,20)(1950,10)第二个map输出为:(1950,25)

(1950,15)(1950,30)Reduce函数被调用是,输入如下:(1950,[0,20,10,25,15,30])因为30是最大的值,所以输出如下:(1950,30)如果我们使用combiner:那么reduce调用的时候传入的数据如下:(1950,[20,30])--(1950,30)用表达式表示为:Max(0,20,10,25,15,30)=max(max(0,20,10),max(25,15,30))=max(20,30)=30

刚才我们是计算最大值可以使用Combiners能提高效率。如果我们要是求平均值呢?Avg(0,20,10,25,15,30)=15如果使用Combiner会得到什么样的结果呢?第一个map输出为:

avg(0,20,10)=10第二个map输出为:Avg(25,15,30)=23输入到reduce出来的结果为:Avg(10,23)=17.517.5和15?所以:使用combiner一定要注意。

Partitioner

Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:

1)均衡负载,尽量的将工作均匀的分配给不同的reduce。

2)效率,分配速度一定要快。

mapreduce提供的Patitioner

Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。2.HashPartitioner是mapreduce的默认partitioner。计算方法是whichreducer=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到当前的目的reducer。BinaryPatitioner继承于Partitioner<BinaryComparable,V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算whichreducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。Whichreducer=(hash&Integer.MAX_VALUE)%numReduceTasksKeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。5.TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。在下一节里详细的介绍totalorderpartitioner。

mapreduce的类型与格式Hadoop0.20.x之前的APIHadoop的MapReduce中,map和reduce函数遵循如下常规格式:map:(K1,V1)→list(K2,V2)reduce:(K2,list(V2))→list(K3,V3)Mapper的接口:publicinterfaceMapper<K1,V1,K2,V2>extendsJobConfigurable,Closeable{voidmap(K1key,V1value,OutputCollector<K2,V2>output,Reporterreporter)throwsIOException;}Reduce的接口:publicinterfaceReducer<K2,V2,K3,V3>extendsJobConfigurable,Closeable{voidreduce(K2key,Iterator<V2>values,OutputCollector<K3,V3>output,Reporterreporter)throwsIOException;}//outputCollector是为了输出key/value对,//Reporter是用来更新计数和状态信息。Hadoop0.20.x之后的APIHadoop的MapReduce中,map和reduce函数遵循如下常规格式:

map:(K1,V1)→list(K2,V2)reduce:(K2,list(V2))→list(K3,V3)Mapper的接口:protectedvoidmap(KEYkey,VALUEvalue,Contextcontext)throwsIOException,InterruptedException{}Reduce的接口:protectedvoidreduce(KEYkey,Iterable<VALUE>values,Contextcontext)throwsIOException,InterruptedException{}//Context是上下文对象,这里Context等同于OutputCollector和Reporter两个函数的功能。

mapreduce的数据类型与java类型对应关系Java的基本类型Writable实现booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritableStringTextWritable接口1.对Java中的int型进行封装那么就是hadoop中的IntWritable类 在写程序时可以把IntWritable可以看着是int类型,它实现了WritableComparable接口。WritableComparable又是Writable、parable接口的子接口。2.Writable类对所有的Java基本类型进行封装:

如:boolean->BooleanWritable;Byte->ByteWritable3.我们可以自定义Writable接口,来编写更复杂的结构的类。核心:hadoop有自己一套的I/O机制。I/O类都必须实现Writable接口。

mapreduce驱动默认的设置InputFormat(输入)TextInputFOrmatMapperClass(map类)IdentityMapperMapRunnerClass(map启动类)MapRunnerMapOutputKeyClassLongWritableMapOutputValueClassTextPartitionerClassHashPartitionerReduceClassIdentityReduceOutputKeyClassLongWritableOutputValueClassTextOutputFormatClassTextOutputFormatDistributedCache代码HIVEHive由Facebook开发架构于Hadoop之上,设计用来管理结构化数据的中间件以MapReduce为执行环境数据储存于HDFS上Metadata储存于RDMBS中Hive的设计原则采用类SQL语法扩充性–Types,Functions,Formats,Scripts性能与平水扩展能力兼具Hive–SQLlikeHadoopDatabaseDriver(compiler,optimizer,executor)metastoreDataNodeDataNodeDataNodeDataNodeHadoopClusterM/RM/RM/RM/RWebUICLIJDBCODBCCreateM/RJobHive中所有的数据都存储在HDFS中,Hive中包含以下数据模型:Table,ExternalTable,Partition,Bucket。Hive中的Table和数据库中的Table在概念上是类似的,每一个Table在Hive中都有一个相应的目录存储数据。例如,一个表xiaojun,它在HDFS中的路径为:/warehouse/xiaojun,其中,wh是在hive-site.xml中由${hive.metastore.warehouse.dir}指定的数据仓库的目录,所有的Table数据(不包括ExternalTable)都保存在这个目录中。Partition对应于数据库中的Partition列的密集索引,但是Hive中Partition的组织方式和数据库中的很不相同。在Hive中,表中的一个Partition对应于表下的一个目录,所有的Partition的数据都存储在对应的目录中。例如:xiaojun表中包含dt和city两个Partition,则对应于dt=20100801,ctry=US的HDFS子目录为:/warehouse/xiaojun/dt=20100801/ctry=US;对应于

dt=20100801,ctry=CA的HDFS子目录为;/warehouse/xiaojun/dt=20100801/ctry=CABuckets对指定列计算hash,根据hash值切分数据,目的是为了并行,每一个Bucket对应一个文件。将user列分散至32个bucket,首先对user列的值计算hash,对应hash值为0的HDFS目录为:/warehouse/xiaojun/dt=20100801/ctry=US/part-00000;hash值为20的HDFS目录为:/warehouse/xiaojun/dt=20100801/ctry=US/part-00020ExternalTable指向已经在HDFS中存在的数据,可以创建Partition。它和Table在元数据的组织上是相同的,而实际数据的存储则有较大的差异。Table的创建过程和数据加载过程(这两个过程可以在同一个语句中完成),在加载数据的过程中,实际数据会被移动到数据仓库目录中;之后对数据对访问将会直接在数据仓库目录中完成。删除表时,表中的数据和元数据将会被同时删除。ExternalTable只有一个过程,加载数据和创建表同时完成(CREATEEXTERNALTABLE……LOCATION),实际数据是存储在LOCATION后面指定的HDFS路径中,并不会移动到数据仓库目录中。当删除一个ExternalTable时,仅删除表结构CREATETABLE创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用IFNOTEXIST选项来忽略这个异常。EXTERNAL关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION),Hive创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。用户在建表的时候可以自定义SerDe或者使用自带的SerDe。如果没有指定ROWFORMAT或者ROWFORMATDELIMITED,将会使用自带的SerDe。在建表的时候,用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的SerDe,Hive通过SerDe确定表的具体的列的数据。如果文件数据是纯文本,可以使用STOREDASTEXTFILE。如果数据需要压缩,使用STOREDASSEQUENCE。有分区的表可以在创建的时候使用PARTITIONEDBY语句。一个表可以拥有一个或者多个分区,每一个分区单独存在一个目录下。而且,表和分区都可以对某个列进行CLUSTEREDBY操作,将若干个列放入一个桶(bucket)中。也可以利用SORTBY对数据进行排序。这样可以为特定应用提高性能。表名和列名不区分大小写,SerDe和属性名区分大小写。表和列的注释是字符串。DropTable删除一个内部表的同时会同时删除表的元数据和数据。删除一个外部表,只删除元数据而保留数据。AlterTableAltertable语句允许用户改变现有表的结构。用户可以增加列/分区,改变serde,增加表和serde熟悉,表本身重命名。内嵌函数Hive只支持等值连接(equalityjoins)、外连接(outerjoins)和(leftsemijoins???)。Hive不支持所有非等值的连接,因为非等值连接非常难转化到map/reduce任务。另外,Hive支持多于2个表的连接。hive不支持顶层union,只能将union封装在子查询中;且必须为union的查询输出定义别名Hive语句示例Hive自定义函数UDFaddjar/root/yyy.jarcreatetemporaryfunctionxxxas'com.hdp.hive.example.udf.HiveUDFExample';selecttime,xxx(time)fromtmp_page_viewlimit100;DroptemporaryfunctionxxxUDF用法•UDF(User-Defined-Function)•UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容•编写UDF函数的时候需要注意一下几点•自定义UDF需要继承org.apache.hadoop.hive.ql.UDF•需要实现evaluate函数•evaluate函数支持重载•UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAFUDAF-用户自定义聚合函数UDAF•Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现•用户自定义聚合函数:Sum,Average……n–1•UDAF(User-DefinedAggregationFuncation)

用法•两个必须的package:importorg.apache.hadoop.hive.ql.exec.UDAF和org.apache.hadoop.hive.ql.exec.UDAFEvaluator开发步骤•函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口•Evaluator需要实现init、iterate、terminatePartial、merge、terminate这几个函数a)init函数实现接口UDAFEvaluator的init函数;b)iterate接收传入的参数,并进行内部的轮转.其返回类型为boolean;c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner;d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean;e)terminate返回最终的聚集函数结果执行步骤•执行求平均数函数的步骤a)将java文件编译成Avg_test.jar。b)进入hive客户端添加jar包:hive>addjar/run/jar/Avg_test.jar。c)创建临时函数:hive>createtemporaryfunctionavg_test'hive.udaf.Avg';d)查询语句:hive>selectavg_test(scores.math)fromscores;e)销毁临时函数:hive>droptemporaryfunctionavg_test;Hive架构&执行流程FLUMEFlumeNG简介Flume是什么Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统收集、聚合事件流数据的分布式框架通常用于log数据收集明显优点如下:可靠的、可伸缩、可管理、可定制、高性能声明式配置,可以动态更新配置支持负载均衡和故障转移功能丰富完全的可扩展数据采集系统结构主要使用FlumeNG进行各类数据的采集工作。用Flume持久化到Hadoop或Hbase中去。FlumeNG架构图如下:数据采集系统结构数据采集系统结构FlumeNG简介核心概念:Agent一个Agent包含Sources,Channels,Sinks和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的。核心概念:SourceSource负责接收events或通过特殊机制产生events,并将events批量的放到一个或多个Channels。有event驱动和轮询2种模式的Source。Source分为多种类型,如:SpoolingDirectorySource,SyslogSources,HTTPSource,AvroSource等。FlumeNG简介日志收集系统-FlumeNG核心概念:ChannelChannel位于Source和Sink之间,用于缓存进来的events,当Sink成功的将events发送到下一跳的channel或最终目的,events从Channel移除。Channel分为多种类型,如:FlieChannel、MemoryChannel。核心概念:SinkSink负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。Sink分为多种类型,如:HDFSSink,HBaseSink,FileRollSink,AvroSinkSTORMSTORM特点

编程模型简单可扩展高可靠性高容错性支持本地模式Storm集群结构

主节点工作节点作业StormNimbusSupervisortopologies(拓扑),死循环HadoopJobtrackerTasktrackerMapReduceJob,执行完自动结束Nimbus和Supervisors之间所有的协调工作是通过一个Zookeeper集群。Nimbus进程和Supervisors进程是无法直接连接和无状态的;

所有的状态维持在Zookeeper中或保存在本地磁盘上。这意味着你可以kill-9Nimbus或Supervisors进程,而不需要做备份。这种设计导致storm集群具有令人难以置信的稳定性,即无耦合。Storm

工作原理Nimbus负责在集群分发的代码,topo只能在nimbus机器上提交,将任务分配给其他机器,和故障监测。Supervisor,监听分配给它的节点,根据Nimbus的委派在必要时启动和关闭工作进程。每个工作进程执行topology的一个子集。一个运行中的topology由很多运行在很多机器上的工作进程组成。在Storm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组Storm

工作原理Storm认为每个stream都有一个源,也就是原始元组的源头,叫做Spout(管口)处理stream内的tuple,抽象为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。可以认为spout就是水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。Storm

工作原理为了增大水处理效率,我们很自然就想到在同个水源处接上多个水龙头并使用多个水处理器,这样就可以提高效率。这是一张有向无环图,Storm将这个图抽象为Topology(拓扑),Topo就是storm的Job抽象概念,一个拓扑就是一个流转换图图中每个节点是一个spout或者bolt,每个spout或者bolt发送元组到下一级组件,广播方式。而Spout到单个Bolt有6种grouping方式,后续细讲。基本概念

Topologies

Streams

Spouts

Bolts

Streamgroupings

Tasks

WorkersTopologyStorm将流中元素抽象为tuple,一个tuple就是一个值列表valuelist,list中的每个value都有一个name,并且该value可以是任意可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。概念Streams:消息流消息流是一个没有边界的tuple序列,而这些tuples会被以一种分布式的方式并行创建和处理。每个tuple可以包含多列,字段类型可以是:integer,long,short,byte,string,double,float,boolean和bytearray。你还可以自定义类型—只要你实现对应的序列化器。Spouts:消息源Spouts是topology消息生产者。Spout从一个外部源(消息队列)读取数据向topology发出tuple。消息源Spouts可以是可靠的也可以是不可靠的。一个可靠的消息源可以重新发射一个处理失败的tuple,一个不可靠的消息源Spouts不会。Spout类的方法nextTuple不断发射tuple到topology,storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。类型得到数据处理StormSpoutBoltHadoopMap(读HDFS文件)ReduceMessageQueueProductorConsumer概念Bolts:消息处理者消息处理逻辑被封装在bolts里面,Bolts可以做很多事情:过滤,聚合,查询数据库等。Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多Bolts。第一级Bolt的输出可以作为下一级Bolt的输入。而Spout不能有一级。Bolts的主要方法是execute(死循环)连续处理传入的tuple,成功处理完每一个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成了。当处理失败时,可以调fail方法通知Spout端可以重新发送该tuple。流程是:Bolts处理一个输入tuple,然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。Bolts使用OutputCollector来发射tuple到下一级Blot。

API介绍——Component组件绿色是我们最常用的,红色是与事务相关的BaseComponent是Storm提供的“偷懒”的类,它及其子类都或多或少实现了其接口定义的部分方法。这样我们在用的时候,而不是自己每次都写所有的方法。SpoutSpout的最顶层抽象是ISpout接口Open()是初始化方法nextTuple()循环发射数据ack()成功处理tuple回调方法Fail()处理失败tuple回调方法activate和deactivate

:spout可以被暂时激活和关闭close方法在该spout关闭前执行,但是并不能得到保证其一定被执行,kill-9时不执行,Stormkill{topoName}时执行原则:通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。BoltIBolt定义了三个方法:prepare方法进行初始化,传入当前执行的上下文execute接受一个tuple进行处理,也可emit数据到下一级组件cleanup同ISpout的close方法,在关闭前调用,不保证其一定执行。IBolt继承了Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker(工作进程)上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文.execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果.还可以通过OutputCollector的emit方法把结果发射到下一级组件。IBasicBolt接口,实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。如果你确实要反馈失败,可以抛出FailedException。实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做了prepare方法和collector.emit.ack(inputTuple)。streamgrouping就是用来定义把stream应该如何分配给Bolts上面的多个Executors(多线程,并发度)storm里面有6种类型的streamgrouping。单线程下均等同于AllGroupingShuffleGrouping轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目相同。2.NonGrouping:无分组,这种分组和Shufflegrouping是一样的效果,多线程下不平均分配。3.FieldsGrouping:按Field分组,比如按word来分组,具有同样word的tuple会被分到相同的Bolts,而不同的word则会被分配到不同的Bolts。作用:1、过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field2、相同的tuple会分发给同一个Executer或task处理典型场景:

去重操作、JoinStreamgrouping策略4.AllGrouping:广播发送,对于每一个tuple,所有的Bolts都会收到。5.GlobalGrouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。6.DirectGrouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这个消息。只有被声明为DirectStream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)Streamgrouping策略并发度一个Topology可以包含一个或多个worker(并行的跑在不同的machine上),所以workerprocess就是执行一个topology的子集,并且worker只能对应于一个topology一个worker可用包含一个或多个executor,每个component(spout或bolt)至少对应于一个executor,所以可以说executor执行一个compenent的子集,同时一个executor只能对应于一个componentTask就是具体的处理逻辑对象,一个executor线程可以执行一个或多个tasks但一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。task代表最大并发度,一个component的task数是不会改变的,但是一个componet的executer数目是会发生变化的(stormrebalance命令),task数>=executor数,executor数代表实际并发数并发度例子Configconf=newCo

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论