生产者消费者设计模式_第1页
生产者消费者设计模式_第2页
生产者消费者设计模式_第3页
生产者消费者设计模式_第4页
生产者消费者设计模式_第5页
已阅读5页,还剩8页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、基于C+11实现生产者消费者设计模式本文将综合运用 C+11 中的新的基础设施(主要是多线程、锁、条件变量)来阐述一个经典问题生产者消费者模式,并给出完整的解决方案。生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源。本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模式,单生产者-多消费者模式,多生产者-单消费者模式,多生产者-多消费者模式,我会给出四种情况下的 C+11 并发解决方案,如果文中出现了错误或者你对代码有异议,欢迎交流。1 单生产者-单消费者模式顾名思义,单生产者-单消费者模式中只有一个生产者和一个消费者,

2、生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。C+11实现单生产者单消费者模式的代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#

3、include <mutex>#include <thread>static const int kItemRepositorySize = 10; / Item buffer size.static const int kItemsToProduce = 1000; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; / 产品缓冲区, 配合 read_position 和 write_position 模式环形队列. size_t r

4、ead_position; / 消费者读取产品位置. size_t write_position; / 生产者写入产品位置. std:mutex mtx; / 互斥量,保护产品缓冲区 std:condition_variable repo_not_full; / 条件变量, 指示产品缓冲区不为满. std:condition_variable repo_not_empty; / 条件变量, 指示产品缓冲区不为空. gItemRepository; / 产品库全局变量, 生产者和消费者操作该变量.typedef struct ItemRepository ItemRepository;void

5、ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiting for an empty slot.n" (ir->repo_not_f

6、ull).wait(lock); / 生产者等待"产品库缓冲区不为满"这一条件发生. (ir->item_buffer)ir->write_position = item; / 写入产品. (ir->write_position)+; / 写入位置后移. if (ir->write_position = kItemRepositorySize) / 写入位置若是在队列最后则重新设置为初始位置. ir->write_position = 0; (ir->repo_not_empty).notify_all(); / 通知消费者产品库不为空.

7、 lock.unlock(); / 解锁.int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait

8、(lock); / 消费者等待"产品库缓冲区不为空"这一条件发生. data = (ir->item_buffer)ir->read_position; / 读取某一产品 (ir->read_position)+; / 读取位置后移 if (ir->read_position >= kItemRepositorySize) / 读取位置若移到最后,则重新置位. ir->read_position = 0; (ir->repo_not_full).notify_all(); / 通知消费者产品库不为满. lock.unlock();

9、/ 解锁. return data; / 返回产品.void ProducerTask() / 生产者任务 for (int i = 1; i <= kItemsToProduce; +i) / sleep(1); std:cout << "Produce the " << i << "th item." << std:endl; ProduceItem(&gItemRepository, i); / 循环生产 kItemsToProduce 个产品. void ConsumerTask()

10、/ 消费者任务 static int cnt = 0; while(1) sleep(1); int item = ConsumeItem(&gItemRepository); / 消费一个产品. std:cout << "Consume the " << item << "th item" << std:endl; if (+cnt = kItemsToProduce) break; / 如果产品消费个数为 kItemsToProduce, 则退出. void InitItemRepository

11、(ItemRepository *ir) ir->write_position = 0; / 初始化产品写入位置. ir->read_position = 0; / 初始化产品读取位置.int main() InitItemRepository(&gItemRepository); std:thread producer(ProducerTask); / 创建生产者线程. std:thread consumer(ConsumerTask); / 创建消费之线程. producer.join(); consumer.join();2 单生产者-多消费者模式与单生产者和单消费者

12、模式不同的是,单生产者-多消费者模式中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 4; / Item buffer size.s

13、tatic const int kItemsToProduce = 10; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t item_counter; std:mutex mtx; std:mutex item_counter_mtx; std:condition_variable repo_not_full; std:condition_variabl

14、e repo_not_empty; gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:

15、cout << "Producer is waiting for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+; if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlo

16、ck();int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (

17、ir->item_buffer)ir->read_position; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data;void ProducerTask() for (int i = 1; i <= kItemsToProduce; +i) / sleep(1); std:cout <&l

18、t; "Producer thread " << std:this_thread:get_id() << " producing the " << i << "th item." << std:endl; ProduceItem(&gItemRepository, i); std:cout << "Producer thread " << std:this_thread:get_id() << " is

19、 exiting." << std:endl;void ConsumerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) int item = ConsumeItem(&gItemRepository); +(gItemRepository.item_co

20、unter); std:cout << "Consumer thread " << std:this_thread:get_id() << " is consuming the " << item << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Consumer t

21、hread " << std:this_thread:get_id() << " is exiting." << std:endl;void InitItemRepository(ItemRepository *ir) ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;int main() InitItemRepository(&gItemRepository); std:thread producer(Produce

22、rTask); std:thread consumer1(ConsumerTask); std:thread consumer2(ConsumerTask); std:thread consumer3(ConsumerTask); std:thread consumer4(ConsumerTask); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join();3 多生产者-单消费者模式与单生产者和单消费者模式不同的是,多生产者-单消费者模式中可以允许多个生产者同时向产品库中放入

23、产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 4; / Item buffer size.static const int kItemsToProduce = 10;

24、 / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t item_counter; std:mutex mtx; std:mutex item_counter_mtx; std:condition_variable repo_not_full; std:condition_variable repo_not_empty; gItemRepository;typ

25、edef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiti

26、ng for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+; if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock();int ConsumeItem(ItemRepository *

27、ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (ir->item_buffer)ir->read_positi

28、on; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data;void ProducerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRepository.item_cou

29、nter_mtx); if (gItemRepository.item_counter < kItemsToProduce) +(gItemRepository.item_counter); ProduceItem(&gItemRepository, gItemRepository.item_counter); std:cout << "Producer thread " << std:this_thread:get_id() << " is producing the " << gItemR

30、epository.item_counter << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Producer thread " << std:this_thread:get_id() << " is exiting." << std:endl;void ConsumerTask() s

31、tatic int item_consumed = 0; while(1) sleep(1); +item_consumed; if (item_consumed <= kItemsToProduce) int item = ConsumeItem(&gItemRepository); std:cout << "Consumer thread " << std:this_thread:get_id() << " is consuming the " << item << "

32、;th item" << std:endl; else break; std:cout << "Consumer thread " << std:this_thread:get_id() << " is exiting." << std:endl;void InitItemRepository(ItemRepository *ir) ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;

33、int main() InitItemRepository(&gItemRepository); std:thread producer1(ProducerTask); std:thread producer2(ProducerTask); std:thread producer3(ProducerTask); std:thread producer4(ProducerTask); std:thread consumer(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join

34、(); consumer.join();4 多生产者-多消费者模式该模式可以说是前面两种模式的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRe

35、positorySize = 4; / Item buffer size.static const int kItemsToProduce = 10; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t produced_item_counter; size_t consumed_item_counter; std:mutex mtx; std:mutex

36、produced_item_counter_mtx; std:mutex consumed_item_counter_mtx; std:condition_variable repo_not_full; std:condition_variable repo_not_empty; gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx);

37、while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiting for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+;

38、if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock();int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir-&g

39、t;read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (ir->item_buffer)ir->read_position; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all

40、(); lock.unlock(); return data;void ProducerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRduced_item_counter_mtx); if (gItemRduced_item_counter < kItemsToProduce) +(gItemRduced_item_counter); ProduceItem(&a

41、mp;gItemRepository, gItemRduced_item_counter); std:cout << "Producer thread " << std:this_thread:get_id() << " is producing the " << gItemRduced_item_counter << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Producer thread " << std:this_thread:get_id() << &

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论