WordCountTopology的实现._第1页
WordCountTopology的实现._第2页
WordCountTopology的实现._第3页
WordCountTopology的实现._第4页
WordCountTopology的实现._第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、WordCountTopology的实现 1. 编写SentenceSpoutpackage com.ibeifeng.bigdata.storm.topo;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base

2、.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;import java.util.Random;/* * Spout开发 * Created by ad on 2016/12/11. */public class SentenceSpout implements IRichSpoutpublic class Sentence

3、Spout extends BaseRichSpout private static final Logger logger = LoggerFactory.getLogger(SentenceSpout.class); /* * tuple发射器 */ private SpoutOutputCollector collector; private static final String SENTENCES = hadoop yarn mapreduce spark, flume hadoop hive spark, oozie yarn spark storm, storm yarn map

4、reduce error, error flume storm spark ; /* * 用来声明该组件向后面组件发射的tuple的key名称依次是什么 * param declarer */ Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(sentence); /* * 用于指定只针对本组件的一些特殊配置 * return */ Override public Map getComponentConfiguration() return nu

5、ll; /* * Spout 组件的初始化方法 * 创建SentenceSpout组件的实例对象时调用,只执行一次 * param conf * param context * param collector */ Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) / 用实例变量来接收tuple发射器 this.collector = collector; /* * close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。 * spout是作为

6、task运行在worker内,在cluster模式下, * supervisor会直接kill -9 woker的进程,这样它就无法执行了。 * 而在本地模式下,只要不是kill -9, 如果是发送停止命令, * 是可以保证close的执行的。 */ Override public void close() / 收尾工作 /* * 在对应时刻暂时激活spout */ Override public void activate() /* * 在对应时刻暂时关闭spout */ Override public void deactivate() /* * Spout组件的核心方法 * 循环调用 *

7、 1)如何从数据源上获取数据 逻辑 写在该方法中 * 2)对获取的数据进行一些简单的处理 * 3) 封装tuple,并且向后面的bolt发射 (其实只能指定tuple的value值依次是什么) */ Override public void nextTuple() / 随机从数组中获取一一条语句(模拟从数据源中获取数据) String sentence = SENTENCESnew Random().nextInt(SENTENCES.length); if(sentence.contains(error) logger.error(记录有问题: + sentence); else / 封装成

8、tuple this.collector.emit(new Values(sentence); try Thread.sleep(10000); catch (InterruptedException e) e.printStackTrace(); /* * 传入的Object其实是一个id,唯一表示一个tuple。 * 该方法是这个id所对应的tuple被成功处理后执行 * param msgId */ Override public void ack(Object msgId) /* * 同ack,只不过是tuple处理失败时执行 * param msgId */ Override pub

9、lic void fail(Object msgId) 编写SplitBlotpackage com.ibeifeng.bigdata.storm.topo;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import

10、backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;/* * Bolt开发 * Created by ad on 2016/12/11. */public class SplitBolt implements IRichBolt /* * bolt组件中发射器 */ private OutputCollector collector; /* * Bolt组件的初始化方法 * * param stormConf * param context * param collector */

11、 Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) this.collector = collector; /* * 每接收到前面组件发射过来的tuple就调用一次 * * bolt对数据处理逻辑写在该方法中 * 处理完后的数据封装成tuple(value部分),继续发射给后面的组件 * 或者执行比如写到数据库、打印到文件等等操作(终点) * * param input */ Override public void execute(Tuple inpu

12、t) String sentence = input.getStringByField(sentence); if(sentence != null & !.equals(sentence) String words = sentence.split( ); for (String word: words) this.collector.emit(new Values(word); /* * cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。 * 但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了, * 那么根本就没有办法来调用那个方法。clean

13、up设计的时候是被用来在 * local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), * 并且你想在关闭一些topology的时候避免资源泄漏 */ Override public void cleanup() /* * declareOutputFields定义一个叫做”word”的字段的 * 该bolt/spout输出的字段个数,供下游使用,在该bolt中的execute方法中, * emit发射的字段个数必须和声明的相同,否则报错:Tuple created with wrong * number of fields. Expected 2 fields b

14、ut got 1 fields * param declarer */ Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(word); Override public Map getComponentConfiguration() return null; 编写CountBlotpackage com.ibeifeng.bigdata.storm.topo;import backtype.storm.task.OutputCollector;im

15、port backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Map;/* * 单词计

16、数 * Created by ad on 2016/12/11. */public class CountBolt extends BaseRichBolt private Map counts; /* * bolt组件中发射器 */ private OutputCollector collector; /* * Bolt组件的初始化方法 * * param stormConf * param context * param collector */ Override public void prepare(Map stormConf, TopologyContext context, Out

17、putCollector collector) this.collector = collector; this.counts = new HashMap(); Override public void execute(Tuple input) String word = input.getStringByField(word); / 单词的累计 int count = 1; if(counts.containsKey(word) count = counts.get(word) + 1; counts.put(word, count); this.collector.emit(new Val

18、ues(word, count); Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(word,count); 编写PrintBlotpackage com.ibeifeng.bigdata.storm.topo;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.O

19、utputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import orm.tuple.Tuple;import java.util.Map;/* * Created by ad on 2016/12/11. */public class PrintBolt extends BaseRichBolt Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) Override pu

20、blic void execute(Tuple input) String word = input.getStringByField(word); Integer count = input.getIntegerByField(count); System.err.println(单词: + word + , - 累计出现次数:+ count); Override public void declareOutputFields(OutputFieldsDeclarer declarer) 6.编写测试程序WordCountTopologypackage com.ibeifeng.bigdat

21、a.storm.topo;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.

22、Fields;/* * wordcountTopology * Created by ad on 2016/12/11. */public class WordCountTopology private static final String SPOUT_ID = sentenceSpout; private static final String SPLIT_BOLT = splitBolt; private static final String COUNT_BOLT = countBolt; private static final String PRINT_BOLT = printBolt; public static void main(String args) / 构造Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID,new SentenceSpout(); / 指定 Spo

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论