版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
目录
简介.............................................................................4
什么是ApacheStorm?.................................................................................................................4
ApacheStormvsHadoop...............................................................................................................4
使用ApacheStorm的例子.....................................................5
ApacheStorm优势............................................................5
Storm的核心组件.............................................................5
Topology...........................................................................................................................................8
Storm集群安装..............................................................9
StormHelloWorld...........................................................................................................................9
环境准备................................................................9
具体流程................................................................10
核心概念........................................................................35
拓扑........................................................................36
任务........................................................................37
进程........................................................................37
流分组......................................................................37
随机分组...............................................................37
字段分组...............................................................38
全局分组...............................................................39
所有分组...............................................................40
集群架构........................................................................40
工作流程........................................................................42
分布式消息系统.................................................................44
什么是分布式消息系统?.....................................................44
Thrift协议...................................................................45
安装............................................................................46
步骤1-验证Java安装.......................................................46
步骤1.1-下载JDK.............................................................................................................46
步骤1.2-解压文件......................................................47
步骤1.3-移动到opt文件夹.............................................47
步骤1.4-设置路径......................................................47
步骤1.5-Java替代项....................................................48
步骤1.6-Java安装验证..................................................48
第2步-ZooKeeper框架安装.................................................48
步骤2.1-下载ZooKeeper.................................................................................................48
步骤2.2-解压tar文件...................................................48
步骤2.3-创建配置文件..................................................49
步骤2.4-启动ZooKeeper服务器..........................................49
步骤2.5・启动CLI..............................................................................................................49
步骤2.6-停止ZooKeeper服务器..........................................50
第3步-ApacheStorm框架安装...............................................51
步骤3.1-下载Storm.........................................................................................................51
步骤3.2••解压tar文件...................................................51
步骤3.3-打开配置文件..................................................51
步骤3.4-启动Nimbus.......................................................................................................52
步骤3.5-启动Supervisor..................................................................................................52
步骤3.6-启动UI...............................................................................................................52
工作实例........................................................................53
场景-移动呼叫日志分析器..................................................53
Spout创建..................................................................53
open.......................................................................................................................................54
nextTuple...............................................................................................................................54
close.......................................................................................................................................55
declareOutputFields.............................................................................................................55
ack..........................................................................................................................................55
fail...........................................................................................................................................55
FakeCallLogReaderSpout......................................................................................................56
编码-FakeCallLogReaderSpout.java.................................................................................56
Bolt创建....................................................................60
Prepare...................................................................................................................................60
execute...................................................................................................................................60
cleanup...................................................................................................................................61
declareOutputFields.............................................................................................................61
呼叫日志创建者bolt...................................................................................................................61
编码-CallLogCreatorBoltjava............................................................................................62
呼叫日志计数器Bolt...................................................................................................................64
编码-CallLogCounterBolt.java..........................................................................................64
创建拓扑....................................................................66
本地集群....................................................................67
编码-LogAnalyserStorm.java.............................................................................................67
构建和运行应用程序.........................................................69
输出....................................................................69
非JVM语言.................................................................70
Python绑定.............................................................70
Trident....................................................................................................................................................71
Trident拓扑.................................................................72
TridentTuples................................................................................................................................72
TridentSpout.................................................................................................................................73
Trident操作.................................................................73
过滤....................................................................74
函数....................................................................乃
聚合....................................................................76
分组....................................................................78
合并和连接.............................................................79
状态维护....................................................................79
分布式RPC...................................................................................................................................80
什么时候使用Trident?.............................................................................................................80
Trident的工作实例...........................................................80
格式化呼叫信息.........................................................81
编码:FormatCall.java.........................................................................................................81
CSVSplit.................................................................................................................................81
编码:CSVSplit.java.............................................................................................................82
日志分析器.............................................................82
编码:LogAnalyserTrident.java..........................................................................................83
构建和运行应用程序.........................................................86
输出....................................................................87
在Twitter上的应用..............................................................88
Twitter............................................................................................................................................88
Spout创建..............................................................88
编码:TwitterSampleSpout.java........................................................................................89
Hashtag阅读器spout..................................................................................................................94
编码:HashtagReaderBok.java..........................................................................................94
Hashtag计数器spout.................................................................................................................96
编码:HashtagCounterBolt.java.........................................................................................96
提交拓扑....................................................................99
编码:TwitterHashtagStorm.java.......................................................................................99
构建和运行应用程序........................................................101
输出...................................................................101
在雅虎财经上的应用............................................................102
Spout创建.................................................................103
编码:YahooFinanceSpout.java........................................................................................103
Bolt仓ij建...................................................................106
编码:PriceCutOffBolt.java...............................................................................................106
提交拓扑...................................................................109
编码:YahooFinanceStorm.java........................................................................................109
构建和运行应用程序........................................................110
输出...................................................................111
应用程序.......................................................................111
Klout.............................................................................................................................................Ill
天气频道...................................................................111
电信业.....................................................................112
简介
什么是ApacheStorm?
ApacheStorm是Twitter开源的分布式实时大数据处理框架,最早开源于
github,从0.9.1版本之后,归于Apache社区,被业界称为实时版Hadoop。
Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框
架,具有最高的摄取率。虽然Storm是无状态的,它通过ApacheZooKeeper管
理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操
作。
随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统
计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理
解决方案(流计算)的应用E趋广泛,目前已是分布式技术领域最新爆发点,
而Storm更是流计算技术中的佼佼者和主流。
ApacheStorm继续成为实时数据分析的领导者。Storm易于设置和操作,并且
它保证每个消息将通过拓扑至少处理一次。
ApacheStormvsHadoop
基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不
同。ApacheSiorm执行除持久性之外的所有操作,而Hadoop在所有方面都很
好,但滞后于实时计算。下表比较了Stoi*m和Hadoop的属在。
StormHadoop
实时流处理批量处理
无状态有状态
主/从架构与基于ZooKeeper的协调。主节点称为具有/不具有基于ZooKeeper的协调
nimbus,从属节点是主管。的主-从结构。主节点是作业跟踪
器,从节点是任务跟踪器,
Storm流过程在集群上每秒可以访问数万条消息。Hadoop分布式文件系统(HDFS)使
用MapReduce框架来处理大量的数
据,需要几分钟或几小时。
Storm拓扑运行直到用户关闭或意外的不可恢复故MapReduce作业按顺序执行并最终
障。完成。
两者都是分布式和容错的
如果nimbus/supervisor死机,重新启动使它从它停如果JobTracker死机,所有正在运行
止的地方继续,因此没有什么受到影响。的作业都会丢失。
使用ApacheStorm的例子
ApacheStorm对于实时大数据流处理非常有名。因此,大多数公司都将Storm
用作其系统的一个组成部分。一些值得注意的例子如下・
Twitter-Twitter正在使用ApacheStorm作为其“发布商分析产品”。“发布
商分析产品”处理Twiiter平台中的每个tweets和点击。ApacheStorm与
Twitter基础架构深度集成。
NaviSite-NaviSite正在使用Storm进行事件日志监控/审计系统。系统中生成
的每个日志都将通过Slorm。Storm将根据配置的正则表达式集检查消息,如果
存在匹配,那么该特定消息将保存到数据库。
Wego-Wego是位于新加坡的旅行元搜索引擎。旅行相关数据来自世界各地的
许多来源,时间不同。Storm帮助Wego搜索实时数据,解决并发问题,并为最
终用户找到最佳匹配。
ApacheStorm优势
下面是ApacheStorm提供的好处列表:
•Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
•Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
•允许实时流处理。
•Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
•Slorm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高
度可扩展的。
•Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具
有非常低的延迟。
•Storm有操作智能。
•Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。
Storm的核心组件
•Nimbus:即Storm的Master,负责资源分配和任务调度。一个Storm集群只有一
个
Nimbuso
•Supervisor:即Storm的Slave,负责接收Nimbus分配的任务,管理所有Worke
r,一个Supervisor节点中包含多个Worker进程。
•Worker:工作进程,每个工作进程中都有多个Task.
■Task:任务,在Storm集群中每个Spout和Bolt都由若干个任务(tasks)来执
行。每个任务都与一个执行线程相对应。
•Topology:计算拓扑,Storm的拓扑是对实时计算应用逻辑的封装,它的作用与
MapReduce的任务(Job)很相似,区别在于MapReduce的一个Job在得到结
果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理
解成由一系列通过数据流(StreamGrouping)相互关联的Spout和Bolt组成的
的拓扑结构。
•Stream:数据流(Streams)是Storm中最核心的抽象概念。一个数据流指的是在
分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种
能够表述数据流中元组的域(fields)的模式来定义。
•Spout:数据源(Spout)是拓扑中数据流的来源。一般Spout会从一个外部的数
据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout既可以定义为可靠
的数据源,也可以定义为不可靠的数据源。一个可靠的Spout能够在它发送的元组
处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不
可靠的Spout就不会在元组发送之后对元组进行任何其他的处理。一个Spout可
以发送多个数据流。
•Bolt:拓扑中所有的数据处理均是由Bolt完成的。通过数据过滤(filtering\函数
处理(functions\聚合(aggregations\联结(joins\数据库交互等功能,Bolt
几乎能够完成任]可一种数据处理需求。一个Bolt可以实现简凿的数据流转换,而更
复杂的数据流变换通常需要使用多个Bolt并通过多个步骤完成。
•Streamgrouping:为拓扑中的每个Bolt的确定输入数据流是定义一个拓扑的重
要环节。数据流分组定义了在Bolt的不同任务(tasks)中划分数据流的方式。在S
torm中有八种内置的数据流分组方式。
•Reliability:可靠性。Storm可以通过拓扑来确保每个发送的元组都能得到正确处
理。通过跟踪由Spout发出的每个元组构成的元组树可以确定元组是否已经完成处
理。每个拓扑都有一个"消息延时"参数,如果Storm在延时时间内没有检测到元
组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。
Storm程序再Storm集群中运行的示例图如下:
Topology
为什么把Topology单独提出来呢,因为Topology是我们开发程序主要的用的组件。
Topology和MapReduce很相像。
M叩Reduce是M叩进行获取数据,Reduce进行处理数据。
而Topology则是使用Spout获取数据,Bolt来进行计算。
总的来说就是一个Topology由一个或者多个的Spout和Bolt组成
具体流程是怎么走,可以通过查看下面这张图来进行了解。
示例图:
Stream
^Stream2
.----Stream!
Stream"
0一-一
'、~Stream------Stream
Stream
g
stream—
注:图片来源/api/tutorials/storm/52o
图片有三种模式,解释如下:
第一种比较简单,就是由一个Spout获取物g,然后交给一个Bolt进行处理;
第二种稍微复杂点,由一个Spout获取数据,然后交给一个Bolt进行处理一部分,然后
在交给下一个Bolt进行处理其他部分。
第三种则上瞰复杂,一个Spout可以同时发送数据到多个Bolt,而一个Bolt也可以接受
多个Spout或多个Bolt,最终形成多个数据流。但是这种数据流必须是有方向的,有起点
和终点,不然会造成死循环,数据永远也处理不完。就是Spout发给Boltl,Boltl发给
Bolt2,Bolt2又发给了Boltl,最终形成了一环状。
Storm集群安装
之前已经写过了,这里就不在说明了。
博客itetlhhttp:〃/2018/01/26/pancm70/
StormHelloWorld
前面讲了一些Storm概念,可能在理解上不太清楚,那么这里我们就用一个HelloWorld
代码示例来体验下Storm运作的流程吧。
环境准备
在进行代码开发之前,首先得做好相关的准备。
本项目是使用Maven构建的,使用Storm的版本为1.1.1。
Maven的相关依赖如下:
<!一-storm相关jar—>
<dependency>
<groupld>org.apache.storm</groupld>
<artifactId>storm-core</artifactId>
<version>l.1.l</version>
<scope>provided</scope>
</dependency>
具体流程
在写代码的时候,我们先来明确要用Storm做什么。
那么第一个程序,就简单的输出下信息。
具体步骤如下:
1.启动topology,设置好Spout和Bolt.
2.将Spout获取的数据传递给Bolt,
3.Bolt接受Spout的数据进行打印。
Spout
那么首先开始编写Spout类。一股是实现IRichSpout或继承BaseRichSpout该类,然
后实现该方法。
这里我们继承BaseRichSpout这个类,该类需要实现这几个主要的方法:
一、open
open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。
有三个参数,它们的作用分别是:
1.Storm配置的Map;
2.topology中组件的信息;
3.发射tuple的方法;
代码示例:
©Override
publicvoidopen(Mapmap,TopologyContextargl,
SpoutOutputCollectorcollector){
System,out.printlnC^open:imap.get("test"));
this,collector=collector;
)
二、nextTuple
nextTuple。方法是Spout实现的核心。
也就是主要执行方法,用于输出信息,通过collector,emit方法发射。
这里我们的数据信息已经写死了,所以这里我们就直接将数据进行发送。
这里设置只发送两次。
代码示例:
@Override
publicvoidnextTuple(){
if(count<=2){
System.out.printin(〃第〃+count+”次开始发送数
据••・〃);
this,collector,emit(newValues(message));
count++;
三、declareOutputFields
declareOutputFields是在IComponent接口中定义,用于声明数据格式。
即输出的一个Tuple中,包含几个字段。
因为这里我们只发射一个,所以就指定一个。如果是多个,则用逗号隔开。
代码示例:
©Override
publicvoiddeclareOutputFields(OutputFieldsDeclarer
declarer){
System.out.printin("定义格式...;
declarer,declare(newFields(field));
)
四、ack
ack是在ISpout接口中定义,用于表示Tuple处理成功。
代码示例:
©Override
publicvoidack(Objectobj){
System.out.println("ack:"+obj);
五、fail
fail是在ISpout接口中定义,用于表示Tuple处理失败。
代码示例:
@Override
publicvoidfail(Objectobj){
System,out.printin("失败:〃+obj);
)
六、close
dose是在ISpout接口中定义,用于表示Topology停止。
代码示例:
©Override
publicvoidclose(){
System,out.printin("关闭...;
)
至于还有其他的,这里就不在一列举了。
Bolt
Bolt是用于处理数据的组件,主要是由execute方法来进行实现。一般来说需要实
现IRichBolt或继承BaseRichBolt该类,然后实现其方法。
需要实现方法如下:
一、prepare
在Bolt启动前执行,提供Bolt启动环境配置的入口。
参数基本和Sqout-Wo
一般对于不可序列化的对象进行实例化。
这里的我们就简单的打印下
©Override
publicvoidprepare(Mapmap,TopologyContextargl,
OutputCollectorcollector){
Systpm.out.print1n(^prpparp:zz+map.gpt(,,tpst,/));
this,collector=collector;
)
注:如果是可以序列化的对象,那么最好是使用构造函数。
二、execute
execute。方法是Bolt实现的核心。
也就是执行方法,每次Bolt从流凌收f丁阅的tuple,都会调用这个方法。
从tuple中获取消息可以使用tuple.getStringO和tuple.getStringByFieldO;这
两个方法。个人推荐第二种,可以通过field来指定接收的消息。
注:如果继承的是IRichBolt,则需要手动ack。这里就不用了,BaseRichBolt会自动帮我
们应答。
代码示例:
©Override
publicvoidexecute(Tupletuple){
//Stringmsg=tuple.getString(O);
Stringmsg=tuple.getStringByField("test");
〃这里我们就不做消息的处理,只打印
System.out.printin(,zBolt第〃+count+”接受的消息:"+msg);
count++;
/**
*
*没次调用处理一个输入的tuple,所有的tuple都必须在一定时间
内应答。
*可以是ack或者fail。否则,spout就会重发tuple。
*/
//collector,ack(tuple);
)
三、declareOutputFields
和Spout的一样。
因为到了这里就不再输出了,所以就什么都没写。
©Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerargO)
(
)
cleanup
cleanup是IBolt接口中定义,用于释放bolt占用的资源。
Storm在终止一个bolt之前会调用这个方法。
因为这里没有什么资源需要释放,所以就简单的打印一句就行了。
©Override
publicvoidcleanup(){
System.out.printin("资源释放〃);
)
Topology
这里我们就是用main方法进行提交topology。
不过在提交topology之前,需要进行相应的设置。
这里我就不一细说了,代码的注释已经很详细了。
代码示例:
importorg.apache.storm.Config;
importorg.apache,storm.LocalCluster;
importorg.apache,storm.StormSubmitter;
importorg.apache,storm,topology.TopologyBuilder;
/**
*
*Title:App
*Description:
*storm测试
*Version:1.0.0
*@authorpancm
*@date2018年3月6日
*/
publicclassApp{
privatestaticfinalStringstrl="testl”;
privatestaticfinalStringstr2="test2〃;
publicstaticvoidmain(String[]args)(
//TODOAuto-generatedmethodstub
〃定义一个拓扑
TopologyBuilderbuilder=newTopologyBuilder();
〃设置一个Executeor(线程),默认一个
builder.setSpout(strl,newTestSpout());
〃设置一个Executeor(线程),和一个task
builder.setBolt(str2,new
TestBolt(),1).setNumTasks(l).shuffleGrouping(strl);
Configconf=newConfigO;
conf,put("test","test");
try(
〃运行拓扑
if(args!=null&&args.length>0){//有参数时,表示向集群提交
作业,并把第一个参数当做topology名称
System.out.printin(〃远程模式〃);
StormSubmitter.submitTopology(args[0],conf,
builder.createTopology());
}else{〃没有参数时,本地提交
〃启动本地模式
System,out.printIn(〃本地模式〃);
LocalClustercluster=newLocalCluster();
cluster,submitTopology11177,conf,
builder.createTopology());
Thread,sleep(10000);
//关闭本地集群
cluster,shutdown();
)
}catch(Exceptione)(
e.printStackTraceO;
)
)
}
运行该方法,输出结果如下:
本地模式
定义格式.•・
open:test
第1次开始发送数据...
第2次开始发送数据...
prepare:test
Bolt第1接受的消息:这是个测试消息!
Bolt第2接受的消息:这是个测试消息!
资源释放
关闭...
到这里,是不是基本上对Storm的运作有些了解了呢。
这个demo达到了上述的三种模式图中的第一种,一个Spout传输数据,一个Bolt处理
雌。
那么如果我们想达到第二种模式呢,那又该如何做呢?
假如我们想统计下在一段文本中的单词出现频率的话,我们只需执行一下步骤就可以了。
.首先将中的消息进行更改为数组,并依次将消息发送到
1SpoutmessageTestBolto
2.然后TestBolt将获取的数据进行分割,将分割的数据发送到TestBolt2。
3,TestBolt2对数据进行统计,在程序关闭的时候进行打印。
4.Topology成功配置并且启动之后,等待20秒左右,关闭程序,然后得到输出的结果。
代码示例如下:
Spout
用于发送消息。
importjava.util.Map;
importorg.apache,storm,spout.SpoutOutputCollector;
importorg.apache,storm,task.TopologyContext;
importorg.apache,storm,topology.OutputFie1dsDec1arer;
importorg.apache,storm,topology,base.BaseRichSpout;
importorg.apache,storm,tuple.Fields;
importorg.apache,storm,tuple.Values;
/**
*
*Title:TestSpout
*Description:
*发送信息
*Version:1.0.0
*©authorpancm
*©date2018年3月6日
*/
publicclassTestSpoutextendsBaseRichSpout{
privatestaticfinallongserialVersionUID=
225243592780939490L;
privateSpoutOutputCollectorcollector;
privatestaticfinalStringfield二〃word”;
privateintcount=l;
privateStringf]message={
“Mynicknameisxuwujing",
“Myblogaddressishttp://www.panchengming.com//7,
“Myinterestisplayinggames”
};
/**
*open。方法中是在ISpout接口中定义,在Spout组件初始化时被调
用。
*有三个参数:
*1.Storm配置的Map;
*2.topology中组件的信息;
*3.发射tuple的方法;
*/
©Override
publicvoidopen(Mapmap,TopologyContextargl,
SpoutOutputCollectorcollector){
System,out.printIn(^open:z,+map.get("test"));
this,collector=collector;
)
/**
*ncxtTupleO方法是Spout实现的核心。
*也就是主要执行方法,用于输出信息,通过collector.emit方法发射。
*/
©Override
publicvoidnextTupleO{
if(count<=message.length){
System,out.printin("第〃+count+”次开始发送数
据…〃);
this,collector,emit(newValues(message[count-
1]));
count++;
/**
*declarcOutputFields是在[Component接口中定义,用于声明数据格
式。
*即输出的一个Tuple中,包含几个字段。
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年特色农业发展合同 agricultural development
- 2025年度新能源汽车充电站车位租赁及维护服务合同3篇
- 2024深圳二手住宅买卖合同
- 2024年皮具礼品箱包定制及品牌推广服务合同3篇
- 2024年城市绿化植物墙设计与施工一体化合同范本3篇
- 2025年度智能软件检测与优化服务合同2篇
- 2024某科技公司与某大型企业关于企业信息化建设的合同
- 2024承包搬运合同范本
- 2024医疗设备投放与医疗机构远程医疗技术支持合同3篇
- 2024年物联网技术研发与运用许可合同
- 四川省义务教育艺术课程设置方案
- 2024年我国人口老龄化问题与对策
- 2024年江西省公务员考试《行测》真题及答案解析
- 家用除湿机产业规划专项研究报告
- 雇人放牛合同模板
- 节能降耗知识培训
- 人教版(2024秋)数学一年级上册 期末综合测试卷课件
- 牛顿迭代的并行化算法
- 2024秋期国家开放大学本科《国际私法》一平台在线形考(形考任务1至5)试题及答案
- 2023-2024学年安徽省淮北市烈山区八年级(上)期末物理试卷
- 建筑垃圾清理运输服务方案
评论
0/150
提交评论