大数据处理框架:Storm:Storm基本组件理解_第1页
大数据处理框架:Storm:Storm基本组件理解_第2页
大数据处理框架:Storm:Storm基本组件理解_第3页
大数据处理框架:Storm:Storm基本组件理解_第4页
大数据处理框架:Storm:Storm基本组件理解_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据处理框架:Storm:Storm基本组件理解1大数据处理框架:Storm:Storm基本组件理解1.1Storm简介1.1.11Storm的发展历史Storm项目最初由NathanMarz和BackType团队在2010年开发,旨在处理实时数据流。2011年,BackType被Twitter收购,Storm也随之成为Twitter的一部分。同年,Storm以开源形式发布,迅速吸引了大数据处理领域的关注。2014年,Storm被Apache软件基金会接受,成为其顶级项目,标志着Storm在实时数据处理框架中的成熟和广泛应用。1.1.22Storm的工作原理Storm是一个分布式实时计算系统,它将数据处理任务分解为一系列的组件,这些组件通过拓扑结构(Topology)连接在一起。Storm的核心组件包括:Spout:数据源,负责从外部系统读取数据并将其发送到Storm的处理流程中。Bolt:数据处理器,负责接收Spout或其他Bolt发送的数据,进行处理后,可以将结果发送到另一个Bolt或直接输出。Worker:执行器,每个Worker运行在集群中的一个节点上,负责执行一个或多个任务(Task)。Task:最小的处理单元,由Bolt或Spout实例化,每个Task执行特定的处理逻辑。Executor:线程管理器,负责在Worker进程中创建和管理Task的线程。Nimbus:主节点,负责分配任务和监控集群状态。Supervisor:从节点,负责管理Worker进程。Zookeeper:协调服务,用于集群协调和状态管理。Storm的数据处理流程是通过定义拓扑结构来实现的。一个拓扑是一个有向无环图(DAG),其中节点是Spout或Bolt,边表示数据流的方向。当一个拓扑被提交到Storm集群时,Nimbus会将拓扑分解为多个任务,并将这些任务分配给集群中的Worker进程。每个Worker进程中的Executor会创建并管理Task的线程,从而执行数据处理任务。1.1.33Storm的应用场景Storm的实时数据处理能力使其在多个领域得到广泛应用,包括:实时分析:如实时监控网站流量、用户行为分析等。在线机器学习:实时更新模型,以反映最新的数据变化。持续计算:处理连续的数据流,如实时计算股票价格的移动平均值。分布式RPC:提供远程过程调用服务,用于分布式系统中的数据交换。数据流处理:处理来自传感器、社交媒体、日志等的实时数据流。1.2示例:使用Storm进行实时数据处理下面是一个使用Storm进行实时数据处理的简单示例。我们将创建一个拓扑,该拓扑从Twitter流中读取数据,然后计算并输出包含特定关键词的推文数量。1.2.1代码示例importorg.apache.storm.Config;

importorg.apache.storm.LocalCluster;

importorg.apache.storm.StormSubmitter;

importorg.apache.storm.topology.TopologyBuilder;

importorg.apache.storm.tuple.Fields;

importorg.apache.storm.tuple.Values;

importorg.apache.storm.spout.SchemeAsMultiScheme;

importorg.apache.storm.spout.TwitterSpout;

importorg.apache.storm.task.OutputCollector;

importorg.apache.storm.task.TopologyContext;

importorg.apache.storm.topology.OutputFieldsDeclarer;

importorg.apache.storm.topology.base.BaseRichBolt;

importorg.apache.storm.tuple.Tuple;

importjava.util.Map;

publicclassTwitterWordCountTopology{

publicstaticclassTweetWordCounterextendsBaseRichBolt{

OutputCollectorcollector;

@Override

publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){

this.collector=collector;

}

@Override

publicvoidexecute(Tupleinput){

Stringsentence=input.getStringByField("tweet");

String[]words=sentence.split("");

for(Stringword:words){

collector.emit(newValues(word,1));

}

collector.ack(input);

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){

declarer.declare(newFields("word","count"));

}

}

publicstaticvoidmain(String[]args)throwsException{

TopologyBuilderbuilder=newTopologyBuilder();

//配置TwitterSpout

TwitterSpoutspout=newTwitterSpout(newSchemeAsMultiScheme(newStringScheme()));

spout.setAuth("consumerKey","consumerSecret","accessToken","accessTokenSecret");

//添加Spout和Bolt到拓扑

builder.setSpout("twitter-spout",spout);

builder.setBolt("word-counter",newTweetWordCounter(),2).shuffleGrouping("twitter-spout");

Configconfig=newConfig();

config.setDebug(true);

if(args!=null&&args.length>0){

config.setNumWorkers(3);

StormSubmitter.submitTopology(args[0],config,builder.createTopology());

}else{

LocalClustercluster=newLocalCluster();

cluster.submitTopology("word-count",config,builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();

}

}

}1.2.2示例解释在上述示例中,我们定义了一个名为TweetWordCounter的Bolt,它接收来自TwitterSpout的推文,并将推文分割成单词,然后为每个单词发射一个包含单词和计数1的Tuple。我们使用TopologyBuilder来构建拓扑,将TwitterSpout作为数据源,TweetWordCounter作为数据处理器。通过shuffleGrouping方法,我们确保从Spout接收到的每个Tuple都会被随机发送到Bolt的一个实例中。在main方法中,我们根据传入的参数决定是在本地集群还是远程集群上运行拓扑。如果参数存在,我们使用StormSubmitter将拓扑提交到远程集群;如果参数不存在,我们使用LocalCluster在本地集群上运行拓扑。1.3结论Storm作为一个强大的实时数据处理框架,提供了灵活的数据流处理模型和丰富的组件,使得开发者能够构建复杂的数据处理管道。通过理解Storm的基本组件和工作原理,我们可以有效地利用Storm来解决实时数据处理中的各种挑战。2Storm架构解析2.11Storm的主节点NimbusNimbus是ApacheStorm中的核心组件,扮演着集群中的主节点角色。它负责分配任务(Topology)到集群中的各个节点,监控集群状态,以及管理集群的配置信息。Nimbus与Zookeeper协同工作,确保即使Nimbus出现故障,也能通过Zookeeper选举出新的Nimbus,从而保证集群的高可用性。2.1.1Nimbus的功能任务分配:Nimbus接收用户提交的Topology,并将其分配给集群中的Supervisor节点执行。状态监控:Nimbus持续监控集群中所有Topology的执行状态,确保任务的正常运行。配置管理:Nimbus管理Storm集群的配置信息,包括Nimbus自身的配置和集群的全局配置。2.1.2Nimbus与Zookeeper的交互Nimbus通过Zookeeper来存储和获取集群的状态信息,包括Topology的分配情况、Supervisor节点的状态等。Zookeeper的高可用性保证了即使Nimbus节点发生故障,也能快速恢复集群的正常运行。2.22Storm的工作者节点SupervisorSupervisor是Storm集群中的工作者节点,负责在本地机器上启动和管理Worker进程。每个Supervisor节点可以运行多个Worker进程,每个Worker进程负责执行一个或多个任务(Topology)的实例。2.2.1Supervisor的功能Worker管理:Supervisor根据Nimbus的分配策略,在本地机器上启动和管理Worker进程。资源分配:Supervisor根据本地机器的资源情况,为Worker进程分配必要的资源,如CPU、内存等。状态上报:Supervisor定期向Nimbus报告本地Worker进程的运行状态,以便Nimbus监控整个集群的健康状况。2.2.2Supervisor与Nimbus的交互Supervisor从Nimbus获取分配给它的Topology信息,然后在本地启动Worker进程来执行这些Topology。同时,Supervisor会将Worker的运行状态反馈给Nimbus,以便Nimbus进行状态监控和故障恢复。2.33Storm的执行单元TopologyTopology是Storm中的执行单元,它由一组Spout和Bolt组成,定义了数据流的处理逻辑。用户通过定义Topology来描述数据处理的流程,然后提交到Storm集群中执行。2.3.1Topology的组成Spout:数据源,负责产生数据流。Bolt:数据处理器,负责接收Spout或其它Bolt发送的数据,进行处理后,再发送给下一个Bolt或输出结果。2.3.2Topology的定义与提交用户通过定义Spout和Bolt,以及它们之间的数据流连接,来构建Topology。以下是一个简单的Topology定义示例://定义Spout

publicclassMySpoutextendsBaseRichSpout{

privateSpoutOutputCollector_collector;

privateint_sequence;

publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){

_collector=collector;

_sequence=0;

}

publicvoidnextTuple(){

_collector.emit(newValues("HelloStorm"),_sequence);

_sequence++;

}

}

//定义Bolt

publicclassMyBoltextendsBaseBasicBolt{

publicvoidexecute(BasicInputinput){

Stringsentence=input.get(0).toString();

LOG.info("Received:"+sentence);

}

}

//构建Topology

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout("spout",newMySpout(),5);

builder.setBolt("bolt",newMyBolt(),8).shuffleGrouping("spout");

//提交Topology

Configconf=newConfig();

conf.setDebug(true);

LocalClustercluster=newLocalCluster();

cluster.submitTopology("my-topology",conf,builder.createTopology());在这个示例中,MySpout作为数据源,产生一系列的字符串数据;MyBolt作为数据处理器,接收这些数据并打印出来。通过TopologyBuilder,用户定义了Spout和Bolt之间的连接方式,然后通过LocalCluster提交Topology到Storm集群中执行。2.3.3Topology的生命周期Topology在Storm集群中的生命周期包括提交、分配、执行和关闭四个阶段。一旦Topology被提交到集群中,它将一直运行,直到用户显式地关闭它。Storm提供了机制来保证Topology的容错性和高可用性,即使集群中的节点发生故障,Topology也能继续运行。3Storm核心组件深入3.11Spout:数据源组件Spout在ApacheStorm中扮演着数据源的角色,它负责从外部系统(如Kafka、RabbitMQ、数据库等)读取数据,并将这些数据以流的形式发送到Storm的处理层。Spout可以是任何数据源,只要它能够持续不断地提供数据流即可。3.1.1Spout的实现原理Spout通过实现ISpout接口来定义其行为。这个接口有两个主要的方法:nextTuple()和ack()。nextTuple()方法用于生成并发送数据元组到流中,而ack()方法则用于确认数据元组是否已经被成功处理。此外,Spout还可以实现fail()方法来处理数据处理失败的情况。3.1.2示例代码下面是一个简单的Spout实现示例,它模拟从一个列表中读取数据,并将其发送到流中:importorg.apache.storm.spout.SpoutOutputCollector;

importorg.apache.storm.task.TopologyContext;

importorg.apache.storm.topology.OutputFieldsDeclarer;

importorg.apache.storm.topology.base.BaseRichSpout;

importorg.apache.storm.tuple.Fields;

importorg.apache.storm.tuple.Values;

importjava.util.Map;

importjava.util.Random;

publicclassSimpleSpoutextendsBaseRichSpout{

privateSpoutOutputCollectorcollector;

privateString[]sentences={"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway","fourscoreandsevenyearsago","snowwhiteandthesevendwarfs","iamattwowithnature"};

privateRandomrand;

@Override

publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){

this.collector=collector;

this.rand=newRandom();

}

@Override

publicvoidnextTuple(){

try{

Thread.sleep(1000);

}catch(InterruptedExceptione){

e.printStackTrace();

}

Stringsentence=sentences[rand.nextInt(sentences.length)];

collector.emit(newValues(sentence));

}

@Override

publicvoidack(ObjectmsgId){

System.out.println("TupleacknowledgedwithmessageID:"+msgId);

}

@Override

publicvoidfail(ObjectmsgId){

System.out.println("TuplefailedwithmessageID:"+msgId);

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){

declarer.declare(newFields("sentence"));

}

}在这个例子中,SimpleSpout从一个预定义的句子列表中随机选择一个句子,并将其作为数据元组发送到流中。nextTuple()方法中的Thread.sleep(1000)用于模拟数据读取的延迟。3.22Bolt:数据处理组件Bolt是Storm中的数据处理单元,它接收来自Spout或其他Bolt的数据元组,进行处理,然后可以将结果发送到其他Bolt或直接输出。Bolt通过实现IBolt接口来定义其行为。3.2.1Bolt的实现原理Bolt通过实现prepare()、execute()和cleanup()方法来定义其生命周期。prepare()方法在Bolt初始化时调用,execute()方法用于处理数据元组,而cleanup()方法在Bolt关闭时调用。3.2.2示例代码下面是一个简单的Bolt实现示例,它接收来自Spout的数据元组,将句子分割成单词,并将每个单词作为新的数据元组发送到流中:importorg.apache.storm.task.OutputCollector;

importorg.apache.storm.task.TopologyContext;

importorg.apache.storm.topology.OutputFieldsDeclarer;

importorg.apache.storm.topology.base.BaseRichBolt;

importorg.apache.storm.tuple.Fields;

importorg.apache.storm.tuple.Tuple;

importorg.apache.storm.tuple.Values;

importjava.util.Map;

publicclassSplitSentenceBoltextendsBaseRichBolt{

privateOutputCollectorcollector;

@Override

publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){

this.collector=collector;

}

@Override

publicvoidexecute(Tupleinput){

Stringsentence=input.getStringByField("sentence");

String[]words=sentence.split("");

for(Stringword:words){

collector.emit(newValues(word));

}

collector.ack(input);

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){

declarer.declare(newFields("word"));

}

}在这个例子中,SplitSentenceBolt接收包含句子的数据元组,使用split()方法将句子分割成单词,然后将每个单词作为新的数据元组发送到流中。3.33Stream:数据流传输机制在Storm中,数据流(Stream)是数据传输的基本机制。数据流是由一系列数据元组组成的,这些元组从Spout开始,经过一系列Bolt的处理,最终可能被输出到外部系统或另一个Bolt。3.3.1Stream的实现原理数据流在Storm中通过emit()方法从Spout或Bolt中发送,然后通过IBolt接口的execute()方法在接收Bolt中处理。数据流的传输是基于消息的,每个数据元组都有一个唯一的消息ID,用于追踪和确认数据处理的状态。3.3.2Stream的分组策略Storm提供了多种分组策略来控制数据流如何从Spout或Bolt发送到接收Bolt。这些策略包括:Shufflegrouping:随机将数据元组发送到接收Bolt。Fieldsgrouping:根据数据元组中的特定字段将数据元组发送到接收Bolt。Allgrouping:将所有数据元组复制并发送到所有接收Bolt。Directgrouping:直接将数据元组发送到指定的接收Bolt。3.3.3示例代码下面是一个使用Fieldsgrouping策略的示例,它将单词数据元组根据单词本身发送到不同的Bolt进行处理:importorg.apache.storm.task.OutputCollector;

importorg.apache.storm.task.TopologyContext;

importorg.apache.storm.topology.OutputFieldsDeclarer;

importorg.apache.storm.topology.base.BaseRichBolt;

importorg.apache.storm.tuple.Fields;

importorg.apache.storm.tuple.Tuple;

importorg.apache.storm.tuple.Values;

importjava.util.Map;

publicclassWordCounterBoltextendsBaseRichBolt{

privateOutputCollectorcollector;

privateMap<String,Integer>counts;

@Override

publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){

this.collector=collector;

this.counts=newHashMap<>();

}

@Override

publicvoidexecute(Tupleinput){

Stringword=input.getStringByField("word");

Integercount=counts.get(word);

if(count==null){

count=0;

}

counts.put(word,count+1);

collector.emit(newValues(word,count+1));

collector.ack(input);

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){

declarer.declare(newFields("word","count"));

}

}在这个例子中,WordCounterBolt接收包含单词的数据元组,使用HashMap来统计每个单词的出现次数,然后将单词和更新后的计数作为新的数据元组发送到流中。通过使用Fieldsgrouping策略,可以确保所有包含相同单词的数据元组都被发送到同一个WordCounterBolt实例,从而实现单词计数的正确性。通过以上三个核心组件的深入理解,我们可以构建出复杂的大数据处理流水线,从数据源读取数据,经过一系列的数据处理,最终输出处理结果。Storm的这种基于流的处理模型,使得它能够实时处理大规模数据流,满足实时数据分析的需求。3.4Storm配置与优化3.4.11Storm配置参数详解Storm的配置参数是其核心组件之一,用于控制拓扑的运行环境和性能。理解这些参数对于优化Storm应用程序至关重要。以下是一些关键的Storm配置参数:topology.workers-**描述**:指定每个Supervisor上运行的Worker进程数量。

-**影响**:更多的Worker进程可以提高并行处理能力,但会增加资源消耗。topology.executors-**描述**:指定每个Worker进程中运行的Executor线程数量。

-**影响**:Executor线程数量影响了并行度,更多的线程可以处理更多的任务,但可能增加线程切换的开销。topology.tuple.ackers-**描述**:指定拓扑中用于确认Tuple的Spout的数量。

-**影响**:提高可靠性,确保每个Tuple都被正确处理,但会增加处理延迟。topology.message.timeout.secs-**描述**:设置Tuple的超时时间,单位为秒。

-**影响**:较短的超时时间可以更快地检测到失败的Tuple,但可能会导致更多的重发。topology.max.spout.pending-**描述**:指定Spout可以同时发出但未被确认的Tuple的最大数量。

-**影响**:较大的值可以提高吞吐量,但会增加内存使用和处理延迟。3.4.22性能调优策略Storm的性能调优涉及多个方面,包括但不限于配置参数的调整、数据序列化方式的选择、以及资源分配的优化。以下是一些调优策略:调整并行度-**策略**:根据任务的计算复杂度和数据吞吐量,合理设置`topology.workers`和`topology.executors`。

-**示例**:如果数据处理较为简单,可以减少Executor的数量,以减少线程切换的开销。优化数据序列化-**策略**:使用更高效的序列化库,如Kryo或Avro,替代默认的Java序列化。

-**示例**:将默认的序列化方式更改为Kryo。

```java

//在Storm配置中设置Kryo序列化

conf.registerSerialization(KryoSerializer.class);

conf.put(Config.TOPOLOGY_SERIALIZATION_REGISTER,Arrays.asList(Integer.class,String.class,MyCustomClass.class));资源分配-**策略**:根据拓扑的实际需求,动态调整CPU、内存和网络资源。

-**示例**:增加Supervisor的内存分配,以支持更多的Worker进程。

```markdown

#在Storm配置文件中设置Supervisor的内存分配

supervisor.memory.mb=81923.4.33故障恢复机制Storm提供了强大的故障恢复机制,确保在组件失败时,拓扑能够自动恢复并继续运行。Tuple确认机制-**描述**:Storm使用Tuple确认机制来检测和恢复失败的处理。

-**示例**:在Spout中实现`nextTuple`和`ack`方法,以确认Tuple的处理。

```java

publicclassMySpoutextendsBaseRichSpout{

privateSpoutOutputCollector_collector;

@Override

publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){

this._collector=collector;

}

@Override

publicvoidnextTuple(){

//发送Tuple

Stringtuple="data";

_collector.emit(newValues(tuple),tuple);

}

@Override

publicvoidack(Objectid){

//确认Tuple

System.out.println("Tuple"+id+"hasbeenfullyprocessed.");

}

@Override

publicvoidfail(Objectid){

//处理失败的Tuple

System.out.println("Tuple"+id+"failed,willbere-emitted.");

_collector.emit(newValues(id),id);

}

}Zookeeper集群-**描述**:Storm使用Zookeeper来管理集群状态,包括拓扑的运行状态和故障恢复。

-**影响**:Zookeeper的高可用性确保了Storm拓扑在故障时能够快速恢复。Nimbus和Supervisor的冗余-**描述**:Storm的Nimbus和Supervisor组件应该在集群中冗余部署,以提高系统的容错性。

-**影响**:即使部分Nimbus或Supervisor失败,Storm仍然能够继续运行和管理拓扑。通过上述配置参数的调整、性能调优策略的实施,以及故障恢复机制的利用,可以显著提高Storm拓扑的性能和可靠性。在实际应用中,应根据具体场景和需求,灵活调整这些参数和策略,以达到最佳的运行效果。3.5Storm实践案例分析3.5.11实时数据分析流程实时数据分析是大数据处理中的关键环节,尤其在需要即时响应的场景下,如金融交易、网络安全监控、社交媒体趋势分析等。Storm,作为一款分布式实时计算系统,能够高效处理大量实时数据流,其流程通常包括数据收集、数据处理和数据存储三个主要阶段。数据收集数据收集是实时分析的第一步,通常涉及从各种数据源(如传感器、日志文件、社交媒体API等)中获取数据。Storm通过Spouts组件来实现这一功能,Spouts可以理解为数据流的源头,负责将数据源源不断地推送到Storm集群中。示例代码:from__future__importabsolute_import,print_function

importsys

fromrandomimportchoice

fromstormimportSpout

fromstorm.taskimportemit

classRandomSentenceSpout(Spout):

_sentences=[

"thecowjumpedoverthemoon",

"anappleadaykeepsthedoctoraway",

"fourscoreandsevenyearsago",

"snowwhiteandthesevendwarfs",

"iamattwowithnature"

]

defnext_tuple(self):

sentence=choice(self._sentences)

emit([sentence])

print("Emittingsentence:%s"%sentence)

sys.stdout.flush()

defack(self,tup_id):

print("Acked:%s"%tup_id)

deffail(self,tup_id):

print("Failed:%s"%tup_id)在这个示例中,RandomSentenceSpout是一个简单的Spout,它从预定义的句子列表中随机选择一个句子并将其推送到数据流中。next_tuple方法用于生成数据,ack和fail方法则用于处理数据传输的确认和失败情况。数据处理数据处理阶段是Storm的核心,通过Bolts组件实现。Bolts可以接收来自Spouts或其他Bolts的数据,执行计算、过滤、聚合等操作,并将处理后的数据发送到下一个Bolt或直接输出。示例代码:from__future__importabsolute_import,print_function

importsys

fromcollectionsimportdefaultdict

fromstormimportBolt

fromstorm.taskimportexecute

classSplitSentenceBolt(Bolt):

definitialize(self,conf,ctx):

self._collector=ctx.collector

defprocess(self,tup):

sentence=tup.values[0]

words=sentence.split("")

forwordinwords:

self._collector.emit([word])

print("Splittingsentence:%s"%sentence)

sys.stdout.flush()SplitSentenceBolt是一个Bolt,它接收来自Spout的句子,将其分割成单词,并将每个单词作为单独的元组发送到下一个组件。这个过程展示了Storm如何通过Bolts进行数据的细粒度处理。数据存储处理后的数据通常需要存储到持久化存储系统中,如数据库、HDFS或消息队列,以便后续分析或应用。Storm可以通过Bolt组件直接与这些系统集成,实现数据的实时存储。示例代码:from__future__importabsolute_import,print_function

importsys

fromstormimportBolt

fromstorm.taskimportexecute

classPrintBolt(Bolt):

definitialize(self,conf,ctx):

self._collector=ctx.collector

defprocess(self,tup):

word=tup.values[0]

self._collector.emit([word])

print("Receivedword:%s"%word)

sys.stdout.flush()在这个简单的示例中,PrintBolt用于接收处理后的单词并打印出来,这可以看作是数据存储的一种简化形式。在实际应用中,Bolt可能会将数据写入数据库或文件系统。3.5.22Storm在社交媒体分析中的应用社交媒体分析是实时数据处理的典型应用场景之一,Storm可以实时监控和分析来自Twitter、Facebook等平台的数据流,帮助识别趋势、情感分析或异常检测。示例代码:from__future__importabsolute_import,print_function

importsys

fromstormimportSpout

fromstorm.taskimportemit

importtweepy

classTwitterSpout(Spout):

def__init__(self):

super(TwitterSpout,self).__init__()

self._auth=tweepy.OAuthHandler("consumer_key","consumer_secret")

self._auth.set_access_token("access_token","access_token_secret")

self._api=tweepy.API(self._auth)

defnext_tuple(self):

forstatusintweepy.Cursor(self._api.search,q="storm",lang="en").items():

emit([status.text])

print("Emittingtweet:%s"%status.text)

sys.stdout.flush()TwitterSpout是一个Spout,它使用Tweepy库从TwitterAPI中获取包含关键词“storm”的英文推文,并将推文文本推送到Storm集群中进行实时分析。3.5.33Storm与Hadoop的集成

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论