Kafka Producer Consumer_第1页
已阅读5页,还剩15页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、kafka producer consumerproducer api ducer.kafkaproducer 假如想学习java工程化、高性能及分布式、深化浅出。微服务、spring,mybatis,netty源码分析的伴侣可以加我的java高级沟通:854630135,群里有阿里大牛直播讲解技术,以及java大型互联网技术的视频免费共享给大家。 1 props.put("bootstrap.servers", "28:9092"); 2 pr

2、ops.put("acks", "all"); 3 props.put("retries", 0); 4 props.put("batch.size", 16384); 5 props.put("linger.ms", 1); 6 props.put("buffer.memory", 33554432); 7 props.put("key.serializer&a

3、mp;quot;, "mon.serialization.stringserializer"); 8 props.put("value.serializer", "mon.serialization.stringserializer"); 9 10 producer producer = new kafkaproducer(props); 11 for (int i = 0; i ("foo", integer.tostring(i), integer

4、.tostring(i), new callback() 13 override 14 public void oncompletion(recordmetadata recordmetadata, exception e) 15 if (null != e) 16 e.printstacktrace(); 17 else 18 system.out.println("callback: " + recordmetadata.topic() + " " + recordmetadata.offset(); 19 20 21

5、 ); 22 23 producer.close(); producer由一个缓冲池组成,这个缓冲池中维护着那些还没有被传送到服务器上的记录,而且有一个后台的i/o线程负责将这些记录转换为哀求并将其传送到集群上去。 send()办法是异步的。当调用它以后就把记录放到buffer中并立刻返回。这就允许生产者批量的发送记录。 acks配置项控制的是完成的标准,即什么样的哀求被认为是完成了的。本例中其值设置的是"all"表示客户端会等待直到全部记录彻低被提交,这是最慢的一种方式也是持久化最好的一种方式。 假如哀求失败了,生产者可以自动重试。由于这里我们设置retr

6、ies为0,所以它不重试。 生产者对每个分区都维护了一个buffers,其中放的是未被发送的记录。这些buffers的大小是通过batch.size配置项来控制的。 默认状况下,即使一个buffer还有未用法的空间(ps:buffer没满)也会立刻发送。假如你想要削减哀求的次数,你可以设置linger.ms为一个大于0的数。这个命令将告知生产者在发送哀求之前先等待多少毫秒,以希翼能有更多的记录到达好填满buffer。在本例中,我们设置的是1毫秒,表示我们的哀求将会延迟1毫秒发送,这样做是为了等待更多的记录到达,1毫秒之后即使buffer没有被填满,哀求也会发送。(ps:略微说明一下这段话,pr

7、oducer调用send()办法只是将记录放到buffer中,然后由一个后台线程将buffer中的记录传送到服务器上。这里所说的哀求指的是从buffer到服务器。默认状况下记录被放到buffer以后立刻被发送到服务器,为了削减哀求服务器的次数,可以通过设置linger.ms,这个配置项表示等多少毫秒以后再发送,这样做是希翼每次哀求可以发送更多的记录,以此削减哀求次数) 假如想学习java工程化、高性能及分布式、深化浅出。微服务、spring,mybatis,netty源码分析的伴侣可以加我的java高级沟通:854630135,群里有阿里大牛直播讲解技术,以及java大型互联网技术的视频免费共

8、享给大家。 buffer.memory控制的是总的buffer内存数量 key.serializer 和 value.serializer表示怎样将key和value对象转成字节 从kafka 0.11开头,kafkaproducer支持两种模型:the idempotent producer and the transactional producer(幂等producer和事务producer)。幂等producer强调的是起码一次精确的投递。事务producer允许应用程序原子的发送消息到多个分区或者主题。 为了启用幂等性,必需将enable.idempotence这个配置的值设为tru

9、e。假如你这样设置了,那么retries默认是integer.max_value,并且acks默认是all。为了利用幂等producer的优势,请避开应用程序级别的重新发送。 为了用法事务producer,你必需配置transactional.id。假如transactional.id被设置,幂等性自动被启用。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put(

10、"transactional.id", "my-transactional-id"); 4 5 producer producer = new kafkaproducer(props, new stringserializer(), new stringserializer(); 6 7 producer.inittransactions(); 8 9 try 10 producer.begintransaction(); 11 12 for (int i = 11; i ("bar",

11、 integer.tostring(i), integer.tostring(i); 14 15 / this method will flush any unsent records before actually committing the transaction 16 mittransaction(); 17 catch (producerfencedexception | outofordersequenceexception | authorizationexception e) 18 producer.close(); 19 catch (kafkaexception e) 20

12、 / by calling producer.aborttransaction() upon receiving a kafkaexception we can ensure 21 / that any successful writes are marked as aborted, hence keeping the transactional guarantees. 22 producer.aborttransaction(); 23 24 25 producer.close(); 假如想学习java工程化、高性能及分布式、深化浅出。微服务、spring,mybatis,netty源码分析

13、的伴侣可以加我的java高级沟通:854630135,群里有阿里大牛直播讲解技术,以及java大型互联网技术的视频免费共享给大家。 consumer api org.apache.kafka.clients.consumer.kafkaconsumer offsets and consumer position 对于分区中的每条记录,kafka维护一个数值偏移量。这个偏移量是分区中一条记录的唯一标识,同时也是消费者在分区中的位置。例如,一个消费者在分区中的位置是5,表示它已经消费了偏移量从0到4的记录,并且接下来它将消费偏移量为5的记录。相对于消费者用户来说,这里事实上有两个位置的概念。 消费

14、者的position表示下一条将要消费的记录的offset。每次消费者通过调用poll(long)接收消息的时候这个position会自动增强。 committed position表示已经被存储的最后一个偏移量。消费者可以自动的周期性提交offsets,也可以通过调用提交api(e.g. commitsync and commitasync)手动的提交position。 consumer groups and topic subscriptions kafka用"consumer groups"(消费者组)的概念来允许一组进程分开处理和消费记录。这些处理在

15、同一个机器上举行,也可以在不同的机器上。同一个消费者组中的消费者实例有相同的group.id 组中的每个消费者可以动态设置它们想要订阅的主题列表。kafka给每个订阅的消费者组都投递一份消息。这归功于消费者组中全部成员之间的均衡分区,以至于每个分区都可以被指定到组中精确的一个消费者。假设一个主题有4个分区,一个组中有2个消费者,那么每个消费者将处理2个分区。 消费者组中的成员是动态维护的:假如一个消费者处理失败了,那么分配给它的分区将会被重新分给组中其它消费者。 在概念上,你可以把一个消费者组想象成一个单个的规律订阅者,并且每个规律订阅者由多个进程组成。作为一个多订阅系统,kafka天生就支持

16、对于给定的主题可以有随意数量的消费者组。 automatic offset committing 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "true

17、"); 5 props.put("erval.ms", "1000"); 6 props.put("key.deserializer", "mon.serialization.stringdeserializer"); 7 props.put("value.deserializer", "mon.serialization.stringdeserializer&

18、;quot;); 8 kafkaconsumer consumer = new kafkaconsumer(props); 9 consumer.subscribe(arrays.aslist("foo", "bar"); 10 while (true) 11 consumerrecords records = consumer.poll(100); 12 for (consumerrecord record : records) 13 system.out.printf("offset = %d, ke

19、y = %s, value = %s%n", record.offset(), record.key(), record.value(); 14 15 设置mit意味着自动提交已消费的记录的offset manual offset control 代替消费者周期性的提交已消费的offsets,用户可以控制什么时候记录被认为是已经消费并提交它们的offsets。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "false"); 5 props.put("key.

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论