老师hadoop八天完全攻克视频教程-weekend110-第7天_第1页
老师hadoop八天完全攻克视频教程-weekend110-第7天_第2页
老师hadoop八天完全攻克视频教程-weekend110-第7天_第3页
老师hadoop八天完全攻克视频教程-weekend110-第7天_第4页
老师hadoop八天完全攻克视频教程-weekend110-第7天_第5页
已阅读5页,还剩18页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

,Storm0.7.0实现了一个新特性——事务性拓扑,这一特性使消息在语义上确保你可以安全,NOTE:Stormspoutbolt之上的抽象概念。omputolt(oltoltNT:su——处理阶段提交阶段om事务。NOTE:Storm使用zookeeper事务元数据,默认情况下就是拓扑使用的那个zookeeper。你可以修改以下两个配置参数键指定其它的zookeeper——transactional.zookeeper.serverstransactional.zookeeper.port。下面要创建一个分析工具来了解事务的工作方式。从一个Redis数据库读tweetsbolt处理它们,最后把结果保存在另一个Redis数据库的列表中。处8-1。8-1正如你看到的,TweetsTransactionalSpouttweet数据库并向拓扑分发批次。UserSplitterBoltHashTagSplitterBoltboltspoutUserSplitterBolttweets并查找用户——以@开头的单词——users的自定义数据流组。HashtagSplitterBolttweet查找#开头的单词,并把它们分发到名为hashtagsbolt,UserHashtagJoinBolt,接收前面提到的两个tweet内的话题数量。为了计数并分发计算结果,这是个最后一个bolt—— 计数,并在对一个批次完成处理时,把所有结果保存到redis。这是一种特殊的bolt,叫做提交者,在本章后面做讲解。TransactionalTopologyBuildernewTransactionalTopologyBuilder("test","spout",newbuilder.setBolt("users-splitter",newUserSplitterBolt(), buildeer.setBolt("hashtag-splitter",newHashtagSplitterBolt(), builder.setBolt("users-hashtag-manager",newUserHashtagJoinBolt(), ("users-splitter","users",new ("hashtag-splitter","hashtags",new miter", ("users-splitter", ("hashtag-splitter", ("user-hashtag-publicclassTweetsTransactionalSpout正如你在这个类定义中看到的,TweetsTransactionalSpout继承了带范型的publicclassTransactionMetadataimplementsSerializable{privatestaticfinallongserialVersionUID=1L;longfrom; publicTransactionMetadata(longfrom,int ty){this.from=from; ty }}publicITransactionalSpout.Coordinator<TransactionMetadata>Mapconf,TopologyContextcontext)returnnew}etadata>getEmitter(Mapconf,TopologyContextcontest){returnnew}publicvoiddeclareOutputFields(OuputFieldsDeclarerdeclarer){declarer.declare(newFields("txid","tweet_id","tweet"));}getCoordinator方法,告诉Storm用来协调生成批次的类。getEmitter,负责批次并RQpublicclassRQpublicstaticfinalStringNEXT_READ="NEXT_READ";publicstaticfinalStringNEXT_WRITE="NEXT_WRITE";JedispublicRQ()jedis=new}publiclonggetavailableToRead(longcurrent){returngetNextWrite()-current;}publiclonggetNextRead()StringsNextRead=jedis.get(NEXT_READ); extRead==null){return}return }publiclonggetNextWrite()return}publicvoidclose(){}publicvoidsetNextRead(longnextRead){jedis.set(NEXT_READ,""+nextRead);}publicList<String>getMessages(longfrom,int ty){String[]keys=newString[ for(inti=0;i< ty;i++){keys[i]=""+(i+from);}return}}publicstaticclassTweetsTransactionalSpoutCoordinatorITransactionalSpout.Coordinator<TransactionMetadata>{TransactionMetadatalastTransactionMetadata;RQrq=newRQ();longnextRead=publicTweetsTransactionalSpoutCoordinator(){nextRead=rq.getNextRead();}publicTransactionMetadatainitializeTransaction(BigIntegertxid,TransactionMetadataprevMetadata){longty=ty=ty>MAX_TRANSACTION_SIZEMAX_TRANSACTION_SIZE:TransactionMetadataret=newTransactionMetadata(nextRead,nextRead+=return}publicbooleanisReady()returnrq.getAvailableToRead(nextRead)>}publicvoidclose(){}}值得一提的是,在整个拓扑中只会有一个提交者实例。创建提交者实例时,它会从redis读取一个从1开始的序列号,这个序列号标识要的tweet下一条。此方法应当相应的返回true或false。在此例中,tweets数量并与已读数量比较。它tweets0tweets未读。tweets可读。只要确认了这一点,就创建一个取的tweets数量(译者注:对象属性ty)。元数据对象一经返回,Stormtxidzookeeper。这样就确保了一旦发生故障,Storm可以利用分发器(译者注:Emitter,见下文)重新发送批次。publicstaticclassimplementsITransactionalSpout.Emitter<TransactionMetadata> RQrq=new publicTweetsTransactionalSpoutEmitter() publicvoidemitBatch(TransactionAttempttx,TransactionMetadatacoordinatorMeta,BatchOutputCollectorcollector){List<String>messages=rq.getMessages(coordinatorMeta.from,<spanstyle="font-family: ia,'TimesNewRoman','BitstreamCharter',Times,serif;font-size:13px;line-height:19px;">coordinatorMeta. eetId=coordinatorMeta.from;for(Stringmessage:messages){collector.emit(newValues(tx,""+tweetId,message));}}publicvoidcleanupBefore(BigIntegertxid)publicvoidclose(){}分发器从数据源数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和id和事务元数据,并确保批次已经重复过了。Storm会在TransactionAttempt对象里为尝试次数增加计数(译者注:attemptid)。这样就能知道在这里emitBatchredis得到tweets,同时增加redistweetstweets分发到拓扑。publicclassUserSplitterBoltimplementsIBasicBolt{privatestaticfinallongserialVersionUID=publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declareStream("users",new}publicMap<String,Object> ponentConfiguration(){returnnull;}publicvoidprepare(MapstormConf,TopologyContextcontext)publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){ eet=input.getStringByField("tweet"); eetId=input.getStringByField("tweet_id");StringTokenizerstrTok=newStringTokenizer(tweet,"");HashSet<String>users=newHashSet<String>();while(strTok.hasMoreTokens()){Stringuser=strTok.nextToken();tweetif(user.startsWith("@")&&!users.contains(user)){}}}publicvoid}publicclassHashtagSplitterBoltimplementsIBasicBolt{privatestaticfinallongserialVersionUID=1L;publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declareStream("hashtags",new}publicMap<String,Object> ponentConfiguration(){returnnull;}publicvoidprepare(MapstormConf,TopologyContextcontext)publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){ eet=input.getStringByField("tweet"); eetId=StringTokenizerstrTok=newStringTokenizer(tweet,"");TransactionAttempttx=HashSet<String>words=newHashSet<String>();while(strTok.hasMoreTokens()){Stringword=strTok.nextToken();if(word.startsWith("#")&&!words.contains(word)){collector.emit("hashtags",newValues(tx,tweetId,}}}publicvoid}UserHashTagJoinBoltBaseBatchBolt。这finishBatch方法。publicvoidexecute(Tupletuple)Stringsource= eetId=if("hashtags".equals(source))Stringhashtag=tuple.getStringByField("hashtag");add(tweetHashtags,tweetId,hashtag);}elseif("users".equals(source))Stringuser=tuple.getStringByField("user");add(userTweets,user,tweetId);}}finishBatch方法。publicvoidfinishBatch()for(Stringuser:userTweets.keySet()){Set<String>tweets=getUserTweets(user);HashMap<String,Integer>hashtagsCounter=newHashMap<String, Set<String>hashtags=getTweetHashtags(tweet);for(StringIntegercount=hashtagsCounter.get(hashtag);}}}for(Stringhashtag:hashtagsCounter.keySet()){intcount=hashtagsCounter.get(hashtag);collector.emit(new}}}你可以在上找到并完整代码。(译者注/storm-book/examples-ch08-transactional-topologies这个仓库里没有代协调者bolts是一类特殊的批处理bolts,它们实现了IComhmitter或者通过TransactionalTopologyBuilder调用 miterBolt设置了提交者bolt。它们与其它的批处理bolts最大的不同在于,提交者bolts的finishBatch方法在提交就绪时执行。这如果同时有事务ID1和事务ID2两个事务同时执行,只有在ID1没有任何差错的执行了finishBatch方法之后,ID2才会执行该方法。public miterBoltextendsimplementsICommitterpublicstaticfinalString MITED_TRANSACTION_FIELD= TransactionAttemptid;BatchOutputCollectorcollector;Jedisjedis;publicvoidprepare(Mapconf,TopologyContextBatchOutputCollectorTransactionAttemptid)this.id=id;this.collector=collector;this.jedis=new}HashMap<String,Long>hashtags=newHashMap<String,Long>();HashMap<String,Long>users=newHashMap<String,Long>();HashMap<String,Long>usersHashtags=newHashMap<String,privatevoidcount(HashMap<String,Long>map,Stringkey,int{Longvalue=if(value==null){value=(long)0;}value+=count;}publicvoidexecute(Tupletuple)Stringorigin=tuple. if("sers-splitter".equals(origin)){Stringuser=tuple.getStringByField("user");count(users,user,1);}elseif("hashtag-splitter".equals(origin))Stringhashtag=tuple.getStringByField("hashtag");count(hashtags,hashtag,1);}elseif("user-hashtag-merger".quals(origin)){Stringhashtag=tuple.getStringByField("hashtag");Stringuser=tuple.getStringByField("user");Stringkey=user+":"+hashtag;Integercount=tuple.getIntegerByField("count");count(usersHashtags,key,count);}}publicvoidfinishBatch() mitedTransaction= StringcurrentTransaction= Transactionmulti= Set<String>keys=hashtags.keySet();for(Stringhashtag:keys){Longcount=hashtags.get(hashtag);multi.hincrBy("hashtags",hashtag,count);}keys=for(Stringuser:keys)Longcount=users.get(user);}keys=usersHashtags.keySet();for(Stringkey:keys){Longcount=usersHashtags.get(key);multi.hincrBy("users_hashtags",key,count);}}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)} MITED_TRANSACTION_FIELD,ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,ID,并在提交前检查。redistweetsredis数据库里。通过实现下面修改TweetsTransactionalSpout,使它可以处理数据分区。BasePartitionedTransactionalSpout,它实现了publicclassTweetsPartitionedTransactionalSpoutextendsBasePartitionedTransactionalSpout<TransactionMetadata>{}publicstaticclassimplementsCoordinator{publicintnumPartitions(){return4;}publicbooleanisReady()return}publicvoidclose()}在这个例子里,协调器很简单。numPartitionsStorm一共有多少分区。而且你IPartitionedTransactionalSpout,元数据由分发器publicstaticclassTweetsPartitionedTransactionalEmitterimplementsEmitter<TransactionMetadata>{PartitionedRQrq= publicTransactionMetadataemitPartitionBatchNew(TransactionAttempttx,BatchOutputCollectorcollector,intpartition,TransactionMetadatalastPartitioonMeta){longif(lastPartitionMeta==null)nextRead= nextRead=lastPartitionMeta.from+ artition,nextRead);} ty=rq.getAvailableToRe artition,nextRead);ty= ty>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE TransactionMetadatametadata=newTransactionMetadata(nextRead,(int) emitPartitionBatch(tx,collector,partition,metadata);returnmetadata;}publicvoidemitPartitionBatch(TransactionAttempttx,BatchOutputCollectorcollector,intpartition,TransactionMetadatapartitionMeta) ty<=0){}List<String>messages=rq.getMessages(partition, eetId=partitionMeta.from;for(Stringmsg:messages){collector.emit(newValues(tx,""+tweetId,msg));}}publicvoidclose()}这里有两个重要的方法,emitPartitionBatchNewemitPartitionBatch。对于zookeeper。StormID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch利用保存下来的元数据重复这个批次。NOTEStormID得到不同的元组,你就tweets5ID321,这时你多数了8个。你要以下三个值——previousCount=5,currentCount=13,以及lastTransactionId=321ID3214个元组,而不是8IDpreviousCount54currentCount9。publicstaticTweetsOpaquePartitionedTransactionalSpoutCoordinatorIOpaquePartitionedTransactionalSpout.Coordinato

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论