版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
大数据处理框架:Storm:Storm的容错机制1Storm简介1.1Storm的基本概念Storm是一个开源的分布式实时计算系统,由NathanMarz和BackType开发,后来被Twitter收购。Storm被设计用于处理大量实时数据流,它能够保证每个消息都被处理,并且处理过程是容错的。Storm的核心概念包括:Topology:Storm的计算任务被称为Topology,它是由多个Spouts和Bolts组成的有向无环图(DAG),用于描述数据流的处理流程。Spout:Spout是数据流的源头,它可以不断生成数据并将数据发送到Bolt进行处理。Bolt:Bolt是数据流的处理单元,它可以接收来自Spout或其他Bolt的数据,进行处理后,再将数据发送到下一个Bolt或输出。Tuple:Tuple是Storm中数据的基本单位,它是一个不可变的记录,用于在Spout和Bolt之间传递数据。Stream:Stream是Tuple的序列,表示数据流。1.2Storm的架构与组件Storm的架构主要由以下几个组件构成:Nimbus:Nimbus是Storm的主节点,负责分配任务和监控集群状态。它类似于Hadoop中的JobTracker。Supervisor:Supervisor运行在Storm集群的每个工作节点上,负责接收Nimbus分配的任务,并在本地机器上启动和监控Worker进程。Worker:Worker是由Supervisor启动的进程,每个Worker运行Topology的一部分,即一个或多个Task。Task:Task是Spout或Bolt的实例,每个Task负责处理数据流中的一个或多个Tuple。Zookeeper:Storm使用Zookeeper来协调集群中的Nimbus和Supervisor,以及存储集群的元数据。1.2.1示例:创建一个简单的StormTopology下面是一个使用Java编写的简单StormTopology示例,该示例从一个Spout发送数据,然后通过一个Bolt进行处理。importbacktype.storm.Config;
importbacktype.storm.LocalCluster;
importbacktype.storm.StormSubmitter;
importbacktype.storm.topology.TopologyBuilder;
importbacktype.storm.tuple.Fields;
publicclassSimpleTopology{
publicstaticvoidmain(String[]args)throwsException{
TopologyBuilderbuilder=newTopologyBuilder();
//定义Spout
builder.setSpout("spout",newMySpout(),1);
//定义Bolt
builder.setBolt("bolt",newMyBolt(),1)
.shuffleGrouping("spout");
Configconf=newConfig();
conf.setDebug(true);
if(args!=null&&args.length>0){
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
}else{
LocalClustercluster=newLocalCluster();
cluster.submitTopology("test",conf,builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
//MySpout类实现ISpout接口
classMySpoutextendsBaseRichSpout{
privatestaticfinallongserialVersionUID=1L;
privateSpoutOutputCollector_collector;
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
_collector=collector;
}
publicvoidnextTuple(){
_collector.emit(newValues("HelloStorm!"));
}
}
//MyBolt类实现IBolt接口
classMyBoltextendsBaseBasicBolt{
privatestaticfinallongserialVersionUID=1L;
publicvoidexecute(BasicInputinput,BasicOutputCollectorcollector){
Stringsentence=input.getFields().get(0).toString();
System.out.println(sentence);
collector.emit(newValues(sentence));
}
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
}
publicvoidcleanup(){
}
publicMap<String,Object>getComponentConfiguration(){
returnnull;
}
}在这个示例中,我们定义了一个简单的Topology,它包含一个Spout和一个Bolt。Spout生成一个字符串“HelloStorm!”,然后Bolt接收这个字符串并打印出来。这个示例展示了如何使用Storm的API来创建和提交一个Topology。1.2.2Storm的容错机制Storm的容错机制主要体现在以下几个方面:TupleAcknowledgement:Storm通过TupleAcknowledgement机制来确保每个Tuple都被正确处理。如果一个Tuple没有被正确处理,Storm会重新发送这个Tuple,直到它被正确处理。TaskFailureDetection:Storm通过心跳机制来检测Task是否失败。如果一个Task在一段时间内没有发送心跳,Storm会认为这个Task已经失败,并重新启动这个Task。WorkerFailureRecovery:如果一个Worker失败,Storm会重新启动这个Worker,并重新分配给它Topology的一部分。TopologyFailureRecovery:如果一个Topology失败,Storm会重新启动这个Topology,并重新分配给它集群的资源。Storm的容错机制使得它能够处理大量的实时数据流,即使在集群中出现故障,也能够保证数据流的正确处理。在实际应用中,开发人员需要根据自己的需求来设计和实现Topology,以充分利用Storm的容错机制。1.2.3结论Storm是一个强大的实时数据流处理框架,它通过其独特的架构和组件,以及强大的容错机制,能够处理大量的实时数据流。通过上述示例,我们可以看到如何使用Storm的API来创建和提交一个Topology,以及Storm的容错机制是如何工作的。在实际应用中,开发人员需要根据自己的需求来设计和实现Topology,以充分利用Storm的容错机制。2容错机制的重要性2.1大数据处理中的挑战在大数据处理领域,数据的规模、速度和复杂性带来了前所未有的挑战。数据集可能包含数PB的数据,每秒处理的数据量可能达到GB甚至TB级别。这种规模和速度要求处理系统必须具备高度的可靠性和容错能力。大数据处理系统如ApacheStorm,运行在分布式环境中,节点的故障是常态而非异常。因此,设计一个能够自动检测和恢复故障的系统是至关重要的。2.1.1数据的规模与速度数据规模:大数据集的处理需要在多个节点上并行执行,这增加了系统复杂性和故障的可能性。数据速度:实时数据流的处理要求系统能够快速响应,任何延迟都可能导致数据丢失或处理不及时。2.1.2分布式环境的挑战节点故障:在分布式系统中,节点的硬件故障、软件错误或网络问题随时可能发生。数据一致性:在多个节点上处理数据时,保持数据的一致性和完整性是一个重大挑战。任务调度与恢复:当故障发生时,系统需要能够自动重新调度任务,并从故障中恢复,以确保处理的连续性和效率。2.2容错机制的作用容错机制在大数据处理框架中扮演着关键角色,它确保了即使在部分组件或节点失败的情况下,系统仍然能够继续运行并完成任务。ApacheStorm通过以下机制实现了容错:2.2.1集群状态的持久化Storm使用Zookeeper来持久化集群的状态,包括拓扑结构、任务分配和执行状态。当节点发生故障时,Zookeeper能够检测到这一变化,并通知其他节点进行相应的调整。2.2.2任务的自动重新调度当检测到一个工作节点失败时,Storm会自动将该节点上的任务重新分配给集群中的其他可用节点。这一过程是透明的,用户无需干预。2.2.3数据流的可靠传输Storm通过使用可靠的Spout和Bolt来确保数据流的可靠传输。当数据被发送时,如果接收方没有确认收到数据,发送方会重新发送数据,直到确认为止。2.2.4拓扑的自动恢复Storm的拓扑结构在设计时就考虑了容错性。当拓扑中的某个组件失败时,Storm会自动重启该组件,并重新建立数据流的连接,确保数据处理的连续性。2.2.5数据的持久化存储Storm支持将处理结果持久化存储到外部系统,如HDFS或数据库中。这样即使Storm集群发生故障,处理结果也不会丢失。2.3示例:Storm中的容错机制下面是一个使用ApacheStorm进行容错处理的示例。我们将创建一个简单的拓扑,该拓扑从Twitter流中读取数据,然后进行词频统计。我们将展示如何配置Storm以确保数据的可靠传输和处理。//导入必要的库
importorg.apache.storm.Config;
importorg.apache.storm.LocalCluster;
importorg.apache.storm.StormSubmitter;
importorg.apache.storm.topology.TopologyBuilder;
importorg.apache.storm.tuple.Fields;
importorg.apache.storm.tuple.Values;
importorg.apache.storm.spout.SchemeAsMultiScheme;
importorg.apache.storm.spout.TwitterSpout;
importorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.topology.base.BaseRichSpout;
importorg.apache.storm.utils.Utils;
//定义Spout,从Twitter流中读取数据
publicclassTwitterSpoutextendsBaseRichSpout{
privateOutputCollector_collector;
privateTwitterSpout_twitterSpout;
@Override
publicvoidopen(Mapconf,TopologyContextcontext,OutputCollectorcollector){
_collector=collector;
_twitterSpout=newTwitterSpout(newSchemeAsMultiScheme(newStringScheme()));
_twitterSpout.open(conf,context,collector);
}
@Override
publicvoidnextTuple(){
_twitterSpout.nextTuple();
}
@Override
publicvoidack(Objectid){
_twitterSpout.ack(id);
}
@Override
publicvoidfail(Objectid){
_twitterSpout.fail(id);
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields("tweet"));
}
}
//定义Bolt,进行词频统计
publicclassWordCountBoltextendsBaseRichBolt{
privateMap<String,Integer>_counts;
privateOutputCollector_collector;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
_collector=collector;
_counts=newHashMap<>();
}
@Override
publicvoidexecute(Tupleinput){
Stringword=input.getStringByField("tweet");
_counts.put(word,_counts.getOrDefault(word,0)+1);
_collector.ack(input);
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields("word","count"));
}
}
//创建拓扑并配置容错机制
publicclassWordCountTopology{
publicstaticvoidmain(String[]args)throwsException{
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("twitter-spout",newTwitterSpout(),1);
builder.setBolt("word-count-bolt",newWordCountBolt(),2)
.shuffleGrouping("twitter-spout");
Configconfig=newConfig();
config.setDebug(true);
config.setNumWorkers(3);
//设置容错机制
config.setMessageTimeoutSecs(30);//设置消息超时时间
config.setTopologyMaxSpoutPending(10);//设置Spout待处理的消息数量上限
if(args!=null&&args.length>0){
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0],config,builder.createTopology());
}else{
LocalClustercluster=newLocalCluster();
cluster.submitTopology("word-count",config,builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("word-count");
cluster.shutdown();
}
}
}在这个示例中,我们配置了Storm的容错机制,包括设置消息超时时间和Spout待处理的消息数量上限。这些配置确保了数据流的可靠传输,即使在节点故障的情况下,Storm也能自动恢复并继续处理数据。通过上述机制和配置,ApacheStorm能够有效地处理大数据流中的故障,确保数据处理的连续性和结果的准确性。这对于构建可靠、高效的大数据处理系统至关重要。3Storm的容错机制3.1任务失败检测在Storm中,任务失败检测是通过心跳机制和消息确认机制来实现的。Storm的主节点Nimbus和工作节点Supervisor之间通过心跳机制保持通信,以检测节点的健康状态。如果Supervisor在一定时间内没有接收到Nimbus的心跳,它会认为Nimbus已经失败,并采取相应的故障恢复措施。3.1.1心跳机制Nimbus和Supervisor之间的心跳机制确保了集群的健康状态。Nimbus定期向Supervisor发送心跳请求,Supervisor接收到请求后会响应,表明其当前状态。如果Supervisor在预设的时间内没有收到Nimbus的心跳,它会认为Nimbus失败,并开始故障恢复流程。3.1.2消息确认机制Storm的另一个关键容错特性是消息确认机制。在Storm的拓扑中,每个Tuple(数据元组)都有一个ID,当Tuple被发送时,它会被标记为未确认状态。下游Bolt在处理完Tuple后,会将其标记为确认状态。如果Spout在一定时间内没有收到所有Tuple的确认,它会重新发送这些Tuple,确保数据的处理不会因为某个组件的失败而丢失。3.2故障恢复策略Storm提供了多种故障恢复策略,包括自动重新分配任务、重新启动失败的组件和数据重放。3.2.1自动重新分配任务当Storm检测到一个工作节点或组件失败时,它会自动将失败的任务重新分配到集群中的其他健康节点上。这一过程是透明的,用户无需干预,Storm会自动处理。3.2.2重新启动失败的组件Storm能够检测到组件(如Spout或Bolt)的失败,并自动重新启动这些组件。重新启动的组件会从失败点开始继续处理数据,确保数据处理的连续性和完整性。3.2.3数据重放在某些情况下,如消息确认机制检测到数据丢失,Storm会触发数据重放。这意味着Storm会从数据源重新读取数据,并重新发送到拓扑中,确保所有数据都被正确处理。3.2.4示例:消息确认机制假设我们有一个简单的Storm拓扑,包含一个Spout和一个Bolt。Spout生成数据,Bolt处理数据。为了演示消息确认机制,我们将在Bolt中故意引入失败,以触发Spout重新发送数据。//Spout类,用于生成数据
publicclassMySpoutextendsBaseRichSpout{
privateSpoutOutputCollector_collector;
privateint_sequenceId=0;
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
this._collector=collector;
}
@Override
publicvoidnextTuple(){
Stringword="word"+_sequenceId;
_collector.emit(newValues(word),_sequenceId);
_sequenceId++;
}
@Override
publicvoidack(ObjectmsgId){
System.out.println("Tuple"+msgId+"hasbeenfullyprocessed.");
}
@Override
publicvoidfail(ObjectmsgId){
System.out.println("Tuple"+msgId+"processinghasfailed.");
}
}
//Bolt类,用于处理数据
publicclassMyBoltextendsBaseBasicBolt{
@Override
publicvoidexecute(BasicInputinput){
Stringword=input.getStringByField("word");
System.out.println("Processingword:"+word);
//故意引入失败
if(word.equals("word10")){
thrownewRuntimeException("Boltfailedtoprocessword10");
}
input.ack();
}
}在这个示例中,当Bolt处理到“word10”时,会抛出一个运行时异常,导致处理失败。Spout会检测到这个失败,并重新发送ID为10的Tuple,直到它被成功处理。通过上述机制和策略,Storm能够有效地处理大数据流中的故障,确保数据处理的可靠性和连续性。4Storm的故障恢复策略详解4.1拓扑重启在Storm中,拓扑重启是处理故障的一种基本策略。当Storm检测到某个任务或工作节点失败时,它会自动重启该任务或整个拓扑,以恢复数据处理流程。这一机制确保了即使在部分组件失败的情况下,数据处理也能继续进行,从而提高了系统的整体可靠性。4.1.1原理Storm的拓扑重启机制基于以下原理:状态检查点:Storm允许用户实现状态检查点,即在处理过程中定期保存任务的状态。这样,在故障发生后,可以从最近的检查点恢复状态,减少数据处理的丢失。故障检测:Storm通过心跳机制监控所有运行中的任务。如果某个任务在预定时间内没有发送心跳,Storm会认为该任务已失败,并触发重启流程。自动重启:一旦检测到故障,Storm会自动重启失败的任务。如果任务连续失败,Storm可能会重启整个拓扑,以避免局部故障影响整体性能。4.1.2代码示例在Storm中实现状态检查点,可以使用StateSpout和StatefulBolt。下面是一个简单的示例,展示如何在Bolt中保存和恢复状态:importorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.tuple.Fields;
importorg.apache.storm.tuple.Tuple;
importorg.apache.storm.tuple.Values;
importorg.apache.storm.state.State;
importorg.apache.storm.state.StateFactory;
importorg.apache.storm.state.MapState;
importjava.util.Map;
publicclassWordCountBoltextendsBaseRichBolt{
privateOutputCollectorcollector;
privateMapState<String,Integer>state;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
StateFactoryfactory=context.getStateFactory();
state=factory.createMapState("word-count");
}
@Override
publicvoidexecute(Tupleinput){
Stringword=input.getStringByField("word");
Integercount=state.get(word);
if(count==null){
count=0;
}
state.put(word,++count);
collector.emit(newValues(word,count));
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields("word","count"));
}
}在这个示例中,WordCountBolt使用MapState来保存每个单词的计数。当拓扑重启时,MapState中的数据会被自动恢复,从而保持了单词计数的连续性。4.2任务状态持久化除了拓扑重启,任务状态持久化是Storm中另一种重要的容错机制。它确保了即使在拓扑重启后,任务的状态也能被持久保存,从而避免了从头开始处理数据。4.2.1原理任务状态持久化依赖于以下原理:状态存储:Storm支持多种状态存储后端,如Redis、ZooKeeper、HBase等。用户可以配置使用哪种后端来存储状态。状态更新:在处理数据的过程中,Bolt可以定期更新其状态到持久化存储中。这样,即使发生故障,状态也不会丢失。状态恢复:当拓扑重启时,Bolt可以从持久化存储中恢复其状态,继续从上次中断的地方开始处理数据。4.2.2代码示例下面是一个使用ZooKeeper作为状态存储后端的示例:importorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.tuple.Fields;
importorg.apache.storm.tuple.Tuple;
importorg.apache.storm.tuple.Values;
importorg.apache.storm.zookeeper.ZooKeeperState;
importorg.apache.storm.zookeeper.ZooKeeperStateFactory;
importjava.util.Map;
publicclassPersistentWordCountBoltextendsBaseRichBolt{
privateOutputCollectorcollector;
privateZooKeeperState<String,Integer>state;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
ZooKeeperStateFactoryfactory=newZooKeeperStateFactory();
state=factory.createZooKeeperState("word-count");
}
@Override
publicvoidexecute(Tupleinput){
Stringword=input.getStringByField("word");
Integercount=state.get(word);
if(count==null){
count=0;
}
state.put(word,++count);
collector.emit(newValues(word,count));
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields("word","count"));
}
}在这个示例中,PersistentWordCountBolt使用ZooKeeperState来持久化单词计数状态。每次处理完一个单词,状态都会被更新到ZooKeeper中,确保了状态的持久性。通过上述机制,Storm能够有效地处理大数据流中的故障,保证数据处理的连续性和准确性,从而在高并发、大数据量的场景下提供稳定的服务。5容错机制在Storm实践中的应用5.1案例分析:数据流处理故障恢复在大数据处理中,数据流的连续性和实时性是关键特性。Storm作为一款分布式实时计算系统,其容错机制确保了在节点故障、网络中断等情况下,数据处理流程能够自动恢复,保证数据的准确处理和系统的持续运行。5.1.1故障场景与恢复机制5.1.1.1场景一:Worker节点故障故障描述:在Storm集群中,Worker节点负责执行Topology中的Task。当某个Worker节点因硬件故障或软件错误而宕机时,数据处理将受到影响。恢复机制:Storm通过检测Worker节点的心跳来判断其是否正常运行。一旦检测到Worker节点故障,Storm将自动重新分配该节点上的Task到其他健康的Worker节点上,确保Topology的完整性和数据处理的连续性。5.1.1.2场景二:网络分区故障描述:网络分区是指网络故障导致集群中的部分节点无法与其他节点通信。这将影响数据的传输和处理。恢复机制:Storm通过其消息确认机制(Acking)来处理网络分区。每个Tuple在被处理后,必须被确认(Ack)。如果Tuple在一定时间内未被确认,Storm将重新发送该Tuple,确保数据的完整处理。5.1.2代码示例:实现Acking机制//定义Bolt,实现IBasicBolt接口,以支持Acking机制
publicclassMyBoltimplementsIBasicBolt{
@Override
publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){
//处理数据
Stringdata=tuple.getStringByField("data");
//假设数据处理成功
collector.emit(newValues(data));
collector.ack(tuple);//确认Tuple已被成功处理
}
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
//初始化Bolt
}
@Override
publicvoidcleanup(){
//清理资源
}
}
//在Topology中配置Acking机制
TopologyBuilderbuilder=newTopologyBuilder();
builder.setBolt("myBolt",newMyBolt(),10)
.whereState(State.ACKING)
.shuffleGrouping("spout");5.1.3数据样例与解释假设我们正在处理一个实时日志流,每个日志Tuple包含以下字段:id:日志的唯一标识符。timestamp:日志的时间戳。data:日志的具体内容。//创建一个示例Tuple
Tupletuple=newValues("log123",System.currentTimeMillis(),"Error:Diskspacelow");在上述代码示例中,我们定义了一个Bolt,它接收日志Tuple,处理数据,并在处理成功后通过调用collector.ack(tuple)来确认Tuple。如果处理失败,Storm将自动重新发送该Tuple,直到被成功处理或达到重试次数上限。5.2最佳实践:优化Storm容错性能5.2.1配置消息确认超时时间Storm默认的消息确认超时时间可能不适用于所有场景。对于处理速度较快的Topology,可以适当降低超时时间,以更快地检测到故障并进行恢复。//配置Topology的Acking超时时间
Configconf=newConfig();
conf.setTopologyAckers(2);//设置Ackers的数量
conf.setTopologyMessageTimeoutSecs(10);//设置消息确认超时时间为10秒5.2.2使用Stateful组件在处理需要状态维护的任务时,使用Stateful组件可以提高容错能力。Stateful组件在故障恢复时能够从上次保存的状态继续执行,避免了数据处理的重复和遗漏。//定义StatefulBolt
publicclassMyStatefulBoltimplementsIRichBolt,IStatefulBolt{
privateMapState<String,String>state;
@Override
publicvoidprepare(Mapmap,TopologyContexttopologyContext,OutputCollectoroutputCollector){
state=(MapState<String,String>)topologyContext.getState();
}
@Override
publicvoidexecute(Tupletuple){
Stringdata=tuple.getStringByField("data");
StringprocessedData=state.get(data);
if(processedData==null){
//处理数据
processedData=processData(data);
state.put(data,processedData);
outputCollector.emit(newValues(processedData));
outputCollector.ack(tuple);
}else{
outputCollector.ack(tuple);//如果数据已处理,直接确认Tuple
}
}
@Override
publicvoidcleanup(){
//清理资源
}
@Override
publicMap<String,Object>getComponentConfiguration(){
returnnull;
}
@Override
publicvoidinitState(Statestate){
//初始化状态
}
@Override
publicvoidcloneState(Statestate){
//复制状态,用于故障恢复
}
}5.2.3优化Task分配策略合理配置Task的分配策略,可以提高系统的容错能力和处理效率。例如,使用fieldsGrouping或shuffleGrouping策略,可以确保数据的均匀分布,减少单点故障的影响。//使用fieldsGrouping策略分配Task
TopologyBuilderbuilder=newTopologyBuilder();
builder.setBolt("myBolt",newMyBolt(),10)
.fieldsGrouping("spout",newFields("id"));通过上述实践,可以显著提高Storm在大数据流处理中的容错性能,确保数据的准确处理和系统的稳定运行。6Storm容错机制的总结与未来发展方向与挑战6.1Storm容错机制的总结6.1.1Spout的容错机制Storm中的Spout组件负责接收数据并将其发送到拓扑中的其他组件。为了确保数据的可靠处理,Storm提供了几种容错机制:Ack机制:当一个Tuple被所有订阅它的Bolt完全处理并调用了ack方法后,Spout才会认为这个Tuple已经被成功处理。如果Bolt在处理过程中失败,它可以通过调用fail方法来通知Spout,Spout会重新发送这个Tuple。消息ID:Spout可以为每个发出的Tuple分配一个唯一的ID,这样在Tuple需要重新发送时,Spout可以确保发送的是同一个Tuple,而不是一个新的Tuple。6.1.1.1代码示例publicclassMySpoutextendsBaseRichSpout{
privateSpoutOutputCollector_collector;
privateMap<String,Boolean>_ackedTuples;
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
_collector=collect
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 授信额度借款合同样本模板3篇
- 数码摄像器材购销合同书3篇
- 工程安防合同3篇
- 揭秘技术服务合同范本模板的内容3篇
- 安装委托书签订流程3篇
- 安徽证券行业劳动合同样本3篇
- 教育培训劳务合同3篇
- 政府建议书写作心得3篇
- 新版个人隐私的保密协议3篇
- 安居房建设施工合同3篇
- WY9故障录波器说明书
- 环卫市场化运营方案PPT
- 设计素描构图
- 菜市场摊贩与管理方合作经营协议书
- 三年级下册英语说课稿-《Lesson 11 What Do They Eat》|冀教版(三起)
- 2023春国家开放大学-02154数据库应用技术-期末考试题带答案
- 山西省灵丘县山西省刁泉银铜矿业有限公司银、铜矿资源开发利用、地质环境保护与土地复垦方案
- 国家开放大学《市场营销学》形考1-4答案
- 沥青路面设计说明
- 《胆囊结石的护理》PPT
- 药品质量受权人管理规程
评论
0/150
提交评论