版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、基于C+11实现生产者消费者设计模式本文将综合运用 C+11 中的新的基础设施(主要是多线程、锁、条件变量)来阐述一个经典问题生产者消费者模式,并给出完整的解决方案。生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源。本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模式,单生产者-多消费者模式,多生产者-单消费者模式,多生产者-多消费者模式,我会给出四种情况下的 C+11 并发解决方案,如果文中出现了错误或者你对代码有异议,欢迎交流。1 单生产者-单消费者模式顾名思义,单生产者-单消费者模式中只有一个生产者和一个消费者,
2、生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。C+11实现单生产者单消费者模式的代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#
3、include <mutex>#include <thread>static const int kItemRepositorySize = 10; / Item buffer size.static const int kItemsToProduce = 1000; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; / 产品缓冲区, 配合 read_position 和 write_position 模式环形队列. size_t r
4、ead_position; / 消费者读取产品位置. size_t write_position; / 生产者写入产品位置. std:mutex mtx; / 互斥量,保护产品缓冲区 std:condition_variable repo_not_full; / 条件变量, 指示产品缓冲区不为满. std:condition_variable repo_not_empty; / 条件变量, 指示产品缓冲区不为空. gItemRepository; / 产品库全局变量, 生产者和消费者操作该变量.typedef struct ItemRepository ItemRepository;void
5、ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiting for an empty slot.n" (ir->repo_not_f
6、ull).wait(lock); / 生产者等待"产品库缓冲区不为满"这一条件发生. (ir->item_buffer)ir->write_position = item; / 写入产品. (ir->write_position)+; / 写入位置后移. if (ir->write_position = kItemRepositorySize) / 写入位置若是在队列最后则重新设置为初始位置. ir->write_position = 0; (ir->repo_not_empty).notify_all(); / 通知消费者产品库不为空.
7、 lock.unlock(); / 解锁.int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait
8、(lock); / 消费者等待"产品库缓冲区不为空"这一条件发生. data = (ir->item_buffer)ir->read_position; / 读取某一产品 (ir->read_position)+; / 读取位置后移 if (ir->read_position >= kItemRepositorySize) / 读取位置若移到最后,则重新置位. ir->read_position = 0; (ir->repo_not_full).notify_all(); / 通知消费者产品库不为满. lock.unlock();
9、/ 解锁. return data; / 返回产品.void ProducerTask() / 生产者任务 for (int i = 1; i <= kItemsToProduce; +i) / sleep(1); std:cout << "Produce the " << i << "th item." << std:endl; ProduceItem(&gItemRepository, i); / 循环生产 kItemsToProduce 个产品. void ConsumerTask()
10、/ 消费者任务 static int cnt = 0; while(1) sleep(1); int item = ConsumeItem(&gItemRepository); / 消费一个产品. std:cout << "Consume the " << item << "th item" << std:endl; if (+cnt = kItemsToProduce) break; / 如果产品消费个数为 kItemsToProduce, 则退出. void InitItemRepository
11、(ItemRepository *ir) ir->write_position = 0; / 初始化产品写入位置. ir->read_position = 0; / 初始化产品读取位置.int main() InitItemRepository(&gItemRepository); std:thread producer(ProducerTask); / 创建生产者线程. std:thread consumer(ConsumerTask); / 创建消费之线程. producer.join(); consumer.join();2 单生产者-多消费者模式与单生产者和单消费者
12、模式不同的是,单生产者-多消费者模式中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 4; / Item buffer size.s
13、tatic const int kItemsToProduce = 10; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t item_counter; std:mutex mtx; std:mutex item_counter_mtx; std:condition_variable repo_not_full; std:condition_variabl
14、e repo_not_empty; gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:
15、cout << "Producer is waiting for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+; if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlo
16、ck();int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (
17、ir->item_buffer)ir->read_position; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data;void ProducerTask() for (int i = 1; i <= kItemsToProduce; +i) / sleep(1); std:cout <&l
18、t; "Producer thread " << std:this_thread:get_id() << " producing the " << i << "th item." << std:endl; ProduceItem(&gItemRepository, i); std:cout << "Producer thread " << std:this_thread:get_id() << " is
19、 exiting." << std:endl;void ConsumerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRepository.item_counter_mtx); if (gItemRepository.item_counter < kItemsToProduce) int item = ConsumeItem(&gItemRepository); +(gItemRepository.item_co
20、unter); std:cout << "Consumer thread " << std:this_thread:get_id() << " is consuming the " << item << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Consumer t
21、hread " << std:this_thread:get_id() << " is exiting." << std:endl;void InitItemRepository(ItemRepository *ir) ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;int main() InitItemRepository(&gItemRepository); std:thread producer(Produce
22、rTask); std:thread consumer1(ConsumerTask); std:thread consumer2(ConsumerTask); std:thread consumer3(ConsumerTask); std:thread consumer4(ConsumerTask); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join();3 多生产者-单消费者模式与单生产者和单消费者模式不同的是,多生产者-单消费者模式中可以允许多个生产者同时向产品库中放入
23、产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRepositorySize = 4; / Item buffer size.static const int kItemsToProduce = 10;
24、 / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t item_counter; std:mutex mtx; std:mutex item_counter_mtx; std:condition_variable repo_not_full; std:condition_variable repo_not_empty; gItemRepository;typ
25、edef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx); while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiti
26、ng for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+; if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock();int ConsumeItem(ItemRepository *
27、ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir->read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (ir->item_buffer)ir->read_positi
28、on; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all(); lock.unlock(); return data;void ProducerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRepository.item_cou
29、nter_mtx); if (gItemRepository.item_counter < kItemsToProduce) +(gItemRepository.item_counter); ProduceItem(&gItemRepository, gItemRepository.item_counter); std:cout << "Producer thread " << std:this_thread:get_id() << " is producing the " << gItemR
30、epository.item_counter << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Producer thread " << std:this_thread:get_id() << " is exiting." << std:endl;void ConsumerTask() s
31、tatic int item_consumed = 0; while(1) sleep(1); +item_consumed; if (item_consumed <= kItemsToProduce) int item = ConsumeItem(&gItemRepository); std:cout << "Consumer thread " << std:this_thread:get_id() << " is consuming the " << item << "
32、;th item" << std:endl; else break; std:cout << "Consumer thread " << std:this_thread:get_id() << " is exiting." << std:endl;void InitItemRepository(ItemRepository *ir) ir->write_position = 0; ir->read_position = 0; ir->item_counter = 0;
33、int main() InitItemRepository(&gItemRepository); std:thread producer1(ProducerTask); std:thread producer2(ProducerTask); std:thread producer3(ProducerTask); std:thread producer4(ProducerTask); std:thread consumer(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join
34、(); consumer.join();4 多生产者-多消费者模式该模式可以说是前面两种模式的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。代码如下:#include <unistd.h>#include <cstdlib>#include <condition_variable>#include <iostream>#include <mutex>#include <thread>static const int kItemRe
35、positorySize = 4; / Item buffer size.static const int kItemsToProduce = 10; / How many items we plan to produce.struct ItemRepository int item_bufferkItemRepositorySize; size_t read_position; size_t write_position; size_t produced_item_counter; size_t consumed_item_counter; std:mutex mtx; std:mutex
36、produced_item_counter_mtx; std:mutex consumed_item_counter_mtx; std:condition_variable repo_not_full; std:condition_variable repo_not_empty; gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) std:unique_lock<std:mutex> lock(ir->mtx);
37、while(ir->write_position + 1) % kItemRepositorySize) = ir->read_position) / item buffer is full, just wait here. std:cout << "Producer is waiting for an empty slot.n" (ir->repo_not_full).wait(lock); (ir->item_buffer)ir->write_position = item; (ir->write_position)+;
38、if (ir->write_position = kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); lock.unlock();int ConsumeItem(ItemRepository *ir) int data; std:unique_lock<std:mutex> lock(ir->mtx); / item buffer is empty, just wait here. while(ir->write_position = ir-&g
39、t;read_position) std:cout << "Consumer is waiting for items.n" (ir->repo_not_empty).wait(lock); data = (ir->item_buffer)ir->read_position; (ir->read_position)+; if (ir->read_position >= kItemRepositorySize) ir->read_position = 0; (ir->repo_not_full).notify_all
40、(); lock.unlock(); return data;void ProducerTask() bool ready_to_exit = false; while(1) sleep(1); std:unique_lock<std:mutex> lock(gItemRduced_item_counter_mtx); if (gItemRduced_item_counter < kItemsToProduce) +(gItemRduced_item_counter); ProduceItem(&a
41、mp;gItemRepository, gItemRduced_item_counter); std:cout << "Producer thread " << std:this_thread:get_id() << " is producing the " << gItemRduced_item_counter << "th item" << std:endl; else ready_to_exit = true; lock.unlock(); if (ready_to_exit = true) break; std:cout << "Producer thread " << std:this_thread:get_id() << &
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 个人信用贷款合同2024年度样本
- 科技实验室管理团队的技能要求
- 教育科技对学校生态教育的推动作用
- 未来展览业展馆空间的创新设计与利用探索报告
- 科技与体育结合的学生健康管理方案
- 2024年生猪养殖与销售保险合作合同3篇
- 2025年新型玻璃涂层研发与应用采购合同3篇
- 二零二五年度企事业单位绿化养护服务合同范本4篇
- 腾讯2025年度企业邮箱使用许可合同3篇
- 2025年度模具行业环保认证及评估合同4篇
- 图像识别领域自适应技术-洞察分析
- 个体户店铺租赁合同
- 新概念英语第二册考评试卷含答案(第49-56课)
- 【奥运会奖牌榜预测建模实证探析12000字(论文)】
- 保安部工作计划
- 2023痛风诊疗规范(完整版)
- (完整word版)企业对账函模板
- 土力学与地基基础(课件)
- 主要负责人重大隐患带队检查表
- 鲁滨逊漂流记人物形象分析
- 危险废物贮存仓库建设标准
评论
0/150
提交评论