Storm应用系列之(集成Kafka).docx_第1页
Storm应用系列之(集成Kafka).docx_第2页
Storm应用系列之(集成Kafka).docx_第3页
Storm应用系列之(集成Kafka).docx_第4页
Storm应用系列之(集成Kafka).docx_第5页
全文预览已结束

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Kafka集群中的Broker地址,有哪两种方法指定?TransactionalTridentKafkaSpout的作用是什么?本地模式无法保存Offset该如何解决?前言Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。Kafka的基本介绍:什么是Kafka准备工作KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造轮子。只是要注意版本问题:0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子源码:/nathanmarz/st . /master/storm-kafkaMaven依赖:/storm/storm-kafka0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI源码:/wurstmeister/storm-kafka-0.8-plus这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。PS:是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。2014/7/29 更新:wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录Maven依赖直接更新成:1. 2. org.apache.storm3. storm-kafka4. 0.9.2-incubating5. 复制代码但是storm似乎没有直接把external的包加载到classpath,所以使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。当然,也可以在maven中加上compile,直接把该jar打到你项目一起。使用KafkaSpout一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息: Kafka集群中的Broker地址 (IP+Port)有两种方法指定:1. 使用静态地址,即直接给定Kafka集群中所有Broker信息1. GlobalPartitionInformation info = new GlobalPartitionInformation();2. info.addPartition(0, new Broker(4,9092);3. info.addPartition(0, new Broker(1,9092);4. BrokerHosts brokerHosts = new StaticHosts(info);复制代码2. 从Zookeeper动态读取1. BrokerHosts brokerHosts = new ZkHosts(4:2181,2:2181);复制代码推荐使用这种方法,因为Kafka的Broker可能会动态的增减 topic名字 当前spout的唯一标识Id (以下代称$spout_id) zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root) 当前topic中数据如何解码了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。在Topology中加入Spout的代码:1. String topic = test;2. String zkRoot = kafkastorm;3. String spoutId = myKafka;4. 5. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic, zkRoot, spoutId);6. spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme();7. 8. TopologyBuilder builder = new TopologyBuilder();9. builder.setSpout(spout, new KafkaSpout(spoutConfig), spoutNum);复制代码其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据1. public class TestMessageScheme implements Scheme 2. 3. private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);4. 5. Override6. public List deserialize(byte bytes) 7. try 8. String msg = new String(bytes, UTF-8);9. return new Values(msg);10. catch (InvalidProtocolBufferException e) 11. LOGGER.error(Cannot parse the provided message!);12. 13. 14. /TODO: what happend if returns null?15. return null;16. 17. 18. Override19. public Fields getOutputFields() 20. return new Fields(msg);21. 22. 23. 复制代码这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,所以这里也还原成String,定义输出为一个名叫msg的field。后面就可以自己添加Bolt处理tuple中该field的数据了。使用TransactionalTridentKafkaSpoutTransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。1. TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts,topic, spoutId);2. kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme();3. 4. TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);5. 6. TridentTopology topology = new TridentTopology();7. topology.newStream(test_str, kafkaSpout).shuffle().each(new Fields(msg, new PrintFunction();复制代码看到它并没有要求我们提供zkRoot,因为直接代码里面写死了 -_-T地址是 /transactional/,在上面的例子中,就是/transactional/test_str/myKafaka常见问题1. 本地模式无法保存OffsetKafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。本地模式,要显示的去配置1. spoutConfig.zkServers = new ArrayList()2. add(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论