




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
实时计算:ApacheStorm:ApacheStorm的容错机制与状态管理1实时计算:ApacheStorm:容错机制与状态管理1.1ApacheStorm简介1.1.11ApacheStorm的基本概念ApacheStorm是一个开源的分布式实时计算系统,它能够保证每个消息都能被处理,并且能够水平扩展以处理大量数据。Storm的设计灵感来源于Twitter的Heron项目,但其提供了更丰富的特性和更广泛的社区支持。Storm的核心概念包括:Topology:Storm的计算任务被称为Topology,它是由多个Spout和Bolt组件组成的有向无环图(DAG)。Spout:Spout是数据源,它可以不断生成数据并将数据发送到Bolt进行处理。Bolt:Bolt是数据处理单元,它可以接收来自Spout或其他Bolt的数据,进行处理后,再将数据发送到其他Bolt或输出。Tuple:Tuple是Storm中数据的基本单位,它是一个不可变的记录,由Spout生成并传递给Bolt。Stream:Stream是Tuple的序列,它在Spout和Bolt之间流动,形成数据流。1.1.22ApacheStorm的架构与组件Storm的架构主要由以下几个组件构成:Nimbus:Nimbus是Storm的主节点,负责分配任务和监控集群状态。Supervisor:Supervisor运行在集群的每个工作节点上,负责接收Nimbus分配的任务,并在本地机器上启动和监控工作进程。Worker:Worker是由Supervisor启动的进程,每个Worker运行一个或多个任务(Task)。Task:Task是Bolt或Spout的实例,每个Task负责处理数据流中的一个部分。Storm的架构设计使得它能够处理大规模的实时数据流,同时保证数据处理的可靠性和容错性。1.1.33实时计算的场景与需求实时计算在多个场景中发挥着关键作用,包括:流数据分析:如实时监控网络流量、用户行为分析等。实时消息处理:如实时日志处理、实时交易系统等。实时机器学习:如实时推荐系统、实时异常检测等。实时计算的需求通常包括:低延迟:数据处理需要在极短的时间内完成。高吞吐量:系统需要能够处理大量的数据流。容错性:系统需要能够处理节点故障,保证数据处理的正确性和完整性。状态管理:系统需要能够保存和管理状态,以便进行复杂的数据处理和分析。1.2ApacheStorm的容错机制Storm的容错机制主要通过以下方式实现:TupleAcknowledgement:Storm通过TupleAcknowledgement机制保证每个Tuple都被正确处理。当一个Tuple被发送到Bolt时,Storm会等待Bolt发送一个Acknowledgement(确认)信号。如果在一定时间内没有收到确认信号,Storm会重新发送这个Tuple。TaskFailureRecovery:当一个Task失败时,Storm会自动重启这个Task,并重新处理它接收到的Tuple。Nimbus和Supervisor的高可用性:Nimbus和Supervisor通过心跳机制监控集群状态,当检测到节点故障时,Nimbus会重新分配任务,Supervisor会重启失败的Task。1.3ApacheStorm的状态管理Storm的状态管理主要通过以下方式实现:StatefulBolt:Storm支持状态化的Bolt,允许Bolt保存和管理状态。状态化的Bolt可以用于实现复杂的业务逻辑,如窗口计算、状态查询等。StatePersistence:Storm支持将状态持久化到外部存储系统,如HadoopHDFS、Cassandra、HBase等。这样可以保证状态的持久性和可靠性。StatefulSpout:Storm也支持状态化的Spout,允许Spout保存和管理状态。状态化的Spout可以用于实现数据重放、数据恢复等功能。1.3.1代码示例:状态化Bolt//定义一个状态化的Bolt
publicclassStatefulBoltextendsBaseRichBolt{
privateMap<String,Integer>counts=newHashMap<>();
privateOutputCollectorcollector;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
}
@Override
publicvoidexecute(Tupletuple){
Stringword=tuple.getStringByField("word");
Integercount=counts.get(word);
if(count==null){
count=0;
}
counts.put(word,count+1);
collector.emit(newValues(word,count+1));
collector.ack(tuple);
}
@Override
publicvoidcleanup(){
//在Bolt关闭时,可以将状态保存到外部存储系统
//例如,可以将counts保存到HadoopHDFS
}
}在上述示例中,StatefulBolt保存了一个counts字典,用于记录每个单词出现的次数。当接收到一个Tuple时,它会从Tuple中读取单词,然后更新counts字典,并将更新后的计数发送到下一个Bolt。在Bolt关闭时,可以将状态保存到外部存储系统,以保证状态的持久性和可靠性。1.4ApacheStorm的实践与优化在实际应用中,为了提高Storm的性能和可靠性,通常需要进行以下优化:TupleAcknowledgement的优化:默认情况下,Storm会等待Bolt发送Acknowledgement信号,这会增加数据处理的延迟。可以通过调整messageTimeoutSecs参数,减少等待时间,从而降低延迟。Task的优化:可以通过调整task.parallelism_hint参数,控制每个Bolt或Spout的Task数量,从而优化数据处理的并行度。StatePersistence的优化:可以通过选择合适的外部存储系统,优化状态的持久化和恢复。例如,可以使用HadoopHDFS作为状态存储系统,以提高状态的可靠性和持久性。1.5总结ApacheStorm是一个强大的实时计算框架,它通过TupleAcknowledgement、TaskFailureRecovery和StatePersistence等机制,保证了数据处理的可靠性和容错性。同时,Storm也提供了丰富的状态管理功能,支持状态化的Bolt和Spout,可以用于实现复杂的业务逻辑。在实际应用中,可以通过优化TupleAcknowledgement、Task和StatePersistence,提高Storm的性能和可靠性。1.6ApacheStorm的容错机制1.6.11Storm的故障检测机制在ApacheStorm中,容错机制的核心在于其能够检测和响应系统中的故障。Storm通过以下几种方式来检测故障:心跳检测:Storm的主节点Nimbus会定期向工作节点Supervisor发送心跳请求,以检查Supervisor是否正常运行。如果Supervisor在一定时间内没有响应,Nimbus会认为Supervisor已故障,并重新分配其上的任务。任务执行检测:每个工作节点上的Executor会定期向Supervisor报告其执行状态。如果Executor在执行任务时遇到错误,它会向Supervisor发送错误报告。Supervisor会记录这些错误,并在必要时重启Executor。消息确认机制:在Storm的流处理中,消息确认机制是确保数据处理正确性的关键。当一个Tuple被发送到一个Bolt时,Storm会等待Bolt确认收到并处理了这个Tuple。如果在一定时间内没有收到确认,Storm会认为这个Tuple处理失败,并重新发送这个Tuple。1.6.22任务失败的处理策略Storm提供了多种策略来处理任务失败:Failover:当检测到故障时,Storm会尝试将任务重新分配到其他健康的Supervisor上执行。这是Storm的基本容错策略,确保了系统的高可用性。Tuple重发:如前所述,如果一个Tuple没有被正确处理,Storm会重新发送这个Tuple,确保数据的完整处理。自定义错误处理:开发人员可以在Spout或Bolt中实现自定义的错误处理逻辑。例如,可以捕获特定类型的异常,并根据异常类型决定是否重发Tuple,或者采取其他补救措施。1.6.33容错机制的实现原理Storm的容错机制主要依赖于其内部的拓扑结构和消息确认机制。以下是其实现原理的详细说明:拓扑结构的持久化:Storm将拓扑结构持久化在Zookeeper中,确保即使Nimbus或Supervisor发生故障,拓扑结构也能被恢复。Zookeeper是一个分布式协调服务,用于管理集群中的配置信息、命名服务、提供分布式锁等。消息确认机制:Storm使用消息确认机制来确保数据的可靠处理。当一个Spout发送一个Tuple时,它会等待所有下游Bolt确认收到并处理了这个Tuple。如果在一定时间内没有收到确认,Spout会重新发送这个Tuple。故障恢复:当检测到故障时,Storm会根据拓扑结构和当前的执行状态,重新分配任务。这可能涉及到重新启动Executor,或者将任务重新分配到其他Supervisor上。示例代码:实现自定义错误处理//Spout类中实现自定义错误处理
publicclassCustomSpoutextendsBaseRichSpout{
privateSpoutOutputCollector_collector;
privateMap<String,Long>_acked;
privateMap<String,Long>_failed;
privateRandom_rand;
privateint_sequence;
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
_collector=collector;
_acked=newHashMap<String,Long>();
_failed=newHashMap<String,Long>();
_rand=newRandom();
_sequence=0;
}
@Override
publicvoidnextTuple(){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
Stringid="id-"+_sequence++;
_collector.emit(newValues(id),id);
if(_rand.nextInt(10)==0){
_collector.fail(id);
System.out.println("Tuple"+id+"failed");
}else{
_collector.ack(id);
System.out.println("Tuple"+id+"acked");
}
}
@Override
publicvoidack(ObjectmsgId){
Longtime=_acked.get(msgId);
if(time!=null){
longlatency=System.currentTimeMillis()-time;
System.out.println("Tuple"+msgId+"latency:"+latency);
_acked.remove(msgId);
}
}
@Override
publicvoidfail(ObjectmsgId){
Longtime=_failed.get(msgId);
if(time!=null){
longlatency=System.currentTimeMillis()-time;
System.out.println("Tuple"+msgId+"faillatency:"+latency);
_failed.remove(msgId);
}
}
}在上述代码中,CustomSpout类实现了自定义的错误处理逻辑。在nextTuple方法中,它会随机决定是否失败一个Tuple。如果失败,它会调用_collector.fail(id)方法,这将导致Storm重新发送这个Tuple。在ack和fail方法中,它记录了Tuple的处理时间和失败时间,这可以用于分析系统的处理延迟和故障恢复时间。数据样例在上述示例中,数据样例是一个简单的字符串,格式为"id-"+_sequence。例如,当_sequence为1时,数据样例为"id-1"。代码讲解在CustomSpout类中,nextTuple方法是Spout的主要执行逻辑。它首先生成一个数据样例,然后使用_collector.emit方法将数据样例发送到下游Bolt。在发送数据样例时,它使用了id作为消息ID,这使得Storm能够跟踪这个Tuple的处理状态。在nextTuple方法中,它还实现了一个简单的错误处理逻辑。如果随机数生成器生成的数字为0,它会调用_collector.fail(id)方法,这将导致Storm重新发送这个Tuple。在ack和fail方法中,它记录了Tuple的处理时间和失败时间。这可以用于分析系统的处理延迟和故障恢复时间。当一个Tuple被成功处理时,它会调用ack方法,并记录处理时间。当一个Tuple处理失败时,它会调用fail方法,并记录失败时间。通过上述代码,我们可以看到Storm的容错机制是如何工作的。它通过消息确认机制和自定义错误处理逻辑,确保了数据的可靠处理和系统的高可用性。2状态管理在实时计算中的重要性2.11状态管理的概念与作用状态管理在实时计算框架中扮演着至关重要的角色,尤其是在像ApacheStorm这样的分布式流处理系统中。状态管理主要涉及如何在处理数据流时保存和管理中间状态,以确保数据处理的准确性和一致性。在实时计算场景下,数据流是连续不断的,状态管理能够帮助系统记住之前处理的数据状态,这对于实现复杂的数据处理逻辑,如窗口计算、事件关联和状态查询等,是必不可少的。2.1.1代码示例:使用ApacheStorm的StateSpout保存状态//定义一个状态Spout,用于保存和读取状态
publicclassMyStateSpoutextendsBaseRichSpout{
privatetransientMapState<String,Integer>state;
privatetransientSpoutOutputCollectorcollector;
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
this.collector=collector;
state=context.getState(newMemoryStateFactory());
}
@Override
publicvoidnextTuple(){
//读取状态
Integercount=state.get("count");
if(count==null){
count=0;
}
//更新状态
state.put("count",++count);
//发送数据
collector.emit(newValues(count));
}
}在上述代码中,MyStateSpout通过context.getState方法获取一个状态管理器MapState,用于保存和读取状态。当nextTuple方法被调用时,它会读取状态count,如果状态不存在,则初始化为0,然后更新状态并发送数据。2.22实时计算中状态管理的挑战实时计算中的状态管理面临着多重挑战,主要包括:状态一致性:在分布式环境中,确保所有节点上的状态一致是非常困难的,尤其是在网络延迟和节点故障的情况下。状态持久化:状态需要在节点故障时能够恢复,这就要求状态能够被持久化到可靠的存储系统中。状态更新的性能:频繁的状态更新会成为实时计算的瓶颈,因此需要高效的状态更新机制。状态查询的延迟:在处理流数据时,可能需要查询状态,如果状态查询的延迟过高,会影响实时计算的性能。2.33状态管理与容错机制的关系状态管理和容错机制在实时计算中是紧密相连的。容错机制确保在节点故障时,系统能够从故障中恢复并继续运行,而状态管理则是在容错机制的基础上,确保数据处理的连续性和一致性。在ApacheStorm中,状态管理是通过Spout和Bolt中的状态接口实现的,这些状态在故障恢复时会被重新加载,从而保证了数据处理的正确性。2.3.1代码示例:使用ApacheStorm的StatefulBolt进行状态恢复//定义一个状态Bolt,用于处理数据并保存状态
publicclassMyStatefulBoltextendsBaseRichBolt{
privatetransientMapState<String,Integer>state;
privatetransientBoltOutputCollectorcollector;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=(BoltOutputCollector)collector;
state=context.getState(newMemoryStateFactory());
}
@Override
publicvoidexecute(Tupleinput){
//读取状态
Integercount=state.get(input.getStringByField("word"));
if(count==null){
count=0;
}
//更新状态
state.put(input.getStringByField("word"),++count);
//发送结果
collector.emit(newValues(input.getStringByField("word"),count));
}
}在上述代码中,MyStatefulBolt在prepare方法中初始化状态管理器MapState,并在execute方法中读取和更新状态。如果节点发生故障,状态会被持久化并能够在恢复时重新加载,从而保证了数据处理的连续性。通过上述示例和讨论,我们可以看到状态管理在实时计算中的重要性,以及它如何与容错机制协同工作,以确保数据处理的准确性和一致性。2.4ApacheStorm中的状态管理2.4.11StormTrident的状态管理在ApacheStorm中,Trident是一个用于处理大量数据流的高级层,它提供了更高级别的抽象和更强大的状态管理功能。Trident允许用户在Spouts和Bolts中保存状态,这在处理需要保持会话状态或需要进行复杂计算(如窗口操作)的流数据时非常有用。原理Trident的状态管理基于一个称为TridentState的接口。这个接口允许用户在Spouts和Bolts中保存和查询状态。状态可以是任何类型的数据,如计数器、数据库连接、或更复杂的数据结构。Trident使用一个StateFactory来创建状态存储,这个工厂可以是内存中的、基于磁盘的、或任何其他持久化存储。内容状态保存:在Trident中,状态保存是通过TridentState的updateState方法完成的。这个方法接收一个TridentTuple和一个State对象,然后可以更新状态。状态查询:状态查询是通过TridentState的get方法完成的。这个方法接收一个TridentTuple和一个State对象,然后返回查询的结果。示例//创建一个状态工厂,这里使用内存状态
StateFactorystateFactory=newMemoryMapStateFactory();
//定义一个TridentSpout,用于读取数据
TridentSpoutspout=newFieldsDeclarer()
.declareStream("wordStream",newValues("hello"))
.declareStream("wordStream",newValues("world"));
//创建一个TridentState,用于保存单词计数
TridentStatestate=topology.newTridentTopology()
.addState("wordCount",stateFactory);
//定义一个TridentBolt,用于更新状态
TridentBoltbolt=newFieldsDeclarer()
.declareStream("wordStream",newFunction(){
@Override
publicvoidexecute(TridentTupletuple,TridentCollectorcollector){
Stringword=tuple.getString(0);
//更新状态
state.updateState(newValues(word),newFields("word"),collector,newUpdateFunction(){
@Override
publicvoidupdate(Map<String,Integer>current,TridentTupletuple){
Stringword=tuple.getString(0);
current.put(word,current.get(word)==null?1:current.get(word)+1);
}
});
}
});
//连接Spout和Bolt
topology.newStream("wordStream",spout)
.each(newFields("word"),bolt,newFields("wordStream"))
.groupBy(newFields("word"))
.stateQuery(state,newFields("word"),newStateFunction(){
@Override
publicvoidexecute(TridentTupletuple,Map<String,Integer>state,TridentCollectorcollector){
collector.emit(newValues(tuple.getString(0),state.get(tuple.getString(0))));
}
},newFields("word","count"));2.4.22ApacheStateSpout的使用StateSpout是ApacheStorm中的一个组件,它允许从状态存储中读取数据,并将其作为流的一部分发送出去。这在需要将状态数据与实时流数据结合进行处理时非常有用。原理StateSpout通过实现IRichSpout接口来工作,它在初始化时接收一个State对象,然后在nextTuple方法中使用这个状态对象来读取和发送数据。内容初始化状态:在StateSpout的初始化方法中,可以使用State对象来加载或初始化状态。发送状态数据:在nextTuple方法中,StateSpout可以查询状态并发送数据。示例//创建一个StateSpout,这里使用一个简单的内存状态
StateSpoutstateSpout=newStateSpout(){
privateStatestate;
@Override
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){
//初始化状态
state=newMemoryMapState();
state.put("hello",1);
state.put("world",1);
}
@Override
publicvoidnextTuple(){
//发送状态数据
Stringword="hello";
collector.emit(newValues(word,state.get(word)));
word="world";
collector.emit(newValues(word,state.get(word)));
}
};
//创建一个Topology,添加StateSpout
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("stateSpout",stateSpout,1);2.4.33状态管理的最佳实践在ApacheStorm中进行状态管理时,遵循一些最佳实践可以提高系统的可靠性和性能。内容状态持久化:确保状态数据被持久化,以防止数据丢失。可以使用ApacheStorm的内置状态存储,如MemoryMapState、BackType.cassandra.CassandraState等,或自定义状态存储。状态更新的原子性:在更新状态时,确保操作是原子的,以避免数据不一致的问题。状态查询的优化:优化状态查询,减少查询延迟。例如,可以使用缓存来减少对状态存储的直接访问。状态的定期清理:定期清理过期或不再需要的状态数据,以避免状态存储的过度膨胀。示例//使用Cassandra作为状态存储
StateFactorystateFactory=newCassandraStateFactory()
.setKeyspace("my_keyspace")
.setColumnFamily("my_column_family");
//创建一个TridentState,用于保存用户会话状态
TridentStatestate=topology.newTridentTopology()
.addState("userSession",stateFactory);
//定义一个TridentBolt,用于更新和查询状态
TridentBoltbolt=newFieldsDeclarer()
.declareStream("userStream",newFunction(){
@Override
publicvoidexecute(TridentTupletuple,TridentCollectorcollector){
StringuserId=tuple.getString(0);
//更新状态
state.updateState(newValues(userId),newFields("userId"),collector,newUpdateFunction(){
@Override
publicvoidupdate(Map<String,Object>current,TridentTupletuple){
//更新用户会话状态
current.put("lastActivity",System.currentTimeMillis());
}
});
//查询状态
state.queryState(newValues(userId),newFields("userId"),newStateFunction(){
@Override
publicvoidexecute(TridentTupletuple,Map<String,Object>state,TridentCollectorcollector){
//发送用户会话状态
collector.emit(newValues(userId,state.get("lastActivity")));
}
});
}
});
//连接Spout和Bolt
topology.newStream("userStream",spout)
.each(newFields("userId"),bolt,newFields("userStream"));在上述示例中,我们使用了Cassandra作为状态存储,以实现状态的持久化。同时,状态更新和查询操作被设计为原子的,以确保数据的一致性。此外,通过定期更新lastActivity字段,我们可以实现状态的定期清理,避免状态存储的过度膨胀。2.5案例分析:ApacheStorm在容错与状态管理中的应用2.5.11实时数据处理的案例在实时数据处理领域,ApacheStorm因其强大的流处理能力而被广泛采用。例如,考虑一个社交媒体平台需要实时分析用户活动,以提供个性化的推荐和广告。数据源可能包括用户点击、评论、分享等行为,这些数据需要被实时处理并分析,以生成即时的用户兴趣模型。示例代码:ApacheStorm拓扑结构定义importorg.apache.storm.Config;
importorg.apache.storm.StormSubmitter;
importorg.apache.storm.topology.TopologyBuilder;
importorg.apache.storm.tuple.Fields;
publicclassSocialMediaAnalysisTopology{
publicstaticvoidmain(String[]args)throwsException{
TopologyBuilderbuilder=newTopologyBuilder();
//定义Spout,从数据源读取数据
builder.setSpout("user-activity-spout",newUserActivitySpout(),5);
//定义Bolt,处理数据并更新用户兴趣模型
builder.setBolt("interest-model-bolt",newInterestModelBolt(),8)
.shuffleGrouping("user-activity-spout");
//配置拓扑
Configconfig=newConfig();
config.setDebug(false);
//提交拓扑
StormSubmitter.submitTopology("social-media-analysis",config,builder.createTopology());
}
}在这个例子中,UserActivitySpout从数据源读取用户活动数据,InterestModelBolt则处理这些数据,更新用户兴趣模型。拓扑结构通过TopologyBuilder定义,使用shuffleGrouping确保数据均匀分布到Bolt中。2.5.22容错机制在案例中的体现ApacheStorm的容错机制确保了即使在节点故障的情况下,数据处理也能继续进行。Storm通过以下机制实现容错:TupleAcknowledgement:当一个Bolt处理完一个tuple后,它必须显式地调用ack方法。如果Bolt在处理tuple时失败,Storm会重新发送这个tuple给其他Bolt实例处理。TaskRebalancing:Storm允许在运行时重新平衡任务,这意味着如果某个节点失败,Storm可以将该节点的任务重新分配给集群中的其他节点。Nimbus和Supervisor的高可用性:Storm的Nimbus和Supervisor节点可以配置为高可用,确保即使主节点失败,也能有备用节点接管,保持集群的正常运行。示例代码:TupleAcknowledgementimportorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.tuple.Tuple;
importjava.util.Map;
publicclassInterestModelBoltextendsBaseRichBolt{
privateOutputCollectorcollector;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
}
@Override
publicvoidexecute(Tupleinput){
//处理数据
StringuserId=input.getStringByField("user-id");
Stringactivity=input.getStringByField("activity");
//更新用户兴趣模型
updateInterestModel(userId,activity);
//确认tuple处理完成
collector.ack(input);
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
//定义输出字段
}
}在上述代码中,InterestModelBolt在处理完一个tuple后,通过调用collector.ack(input)来确认tuple的处理,这是Storm容错机制中的关键部分。2.5.33状态管理在案例中的作用状态管理是实时数据处理中不可或缺的一部分,尤其是在需要维护用户状态或历史数据的场景中。ApacheStorm通过以下方式支持状态管理:StatefulBolts:Storm允许Bolt维护状态,这意味着Bolt可以存储和检索数据,以支持更复杂的数据处理逻辑。StatePersistence:状态可以被持久化到外部存储系统,如HBase、Cassandra或Redis,以确保状态的持久性和一致性。StatefulSpouts:类似于Bolt,Spout也可以维护状态,这对于处理需要上下文或历史数据的流非常有用。示例代码:使用Redis进行状态管理importorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.tuple.Tuple;
importredis.clients.jedis.Jedis;
importjava.util.Map;
publicclassInterestModelBoltextendsBaseRichBolt{
privateOutputCollectorcollector;
privateJedisjedis;
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
this.jedis=newJedis("localhost");
}
@Override
publicvoidexecute(Tupleinput){
StringuserId=input.getStringByField("user-id");
Stringactivity=input.getStringByField("activity");
//从Redis读取用户兴趣模型
StringinterestModel=jedis.get(userId);
//更新用户兴趣模
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 安检员工服务规范
- 2024年监理工程师备考心得试题及答案
- 班级合作项目的有效模式计划
- 美术教师课堂管理技巧分享计划
- 建立学习型组织的步骤与总结计划
- 保险行业安全风险防控指南计划
- 多元文化教育实施计划
- 2024年特许另类投资考试优化复习试题及答案
- 利用大数据优化财务决策计划
- 适应行业变化的灵活策略计划
- 《少先队员采茶歌》课件
- 新外研版高一英语必修二unit6课文
- 气排球比赛积分表
- 20道瑞幸咖啡营运经理岗位常见面试问题含HR常问问题考察点及参考回答
- 教师调课申请表
- 急性心力衰竭中国急诊管理指南2022
- 《利用导数研究函数的零点问题》教学设计
- 茶室设计-课件
- 安全生产重大事故隐患排查报告表
- 管道系统吹洗(扫)记录(压缩空气)
- 建设单位甲方对监理单位考核管理办法
评论
0/150
提交评论