生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
要实现生产者消费者模式。首先要保证:
三个关系:生产者和生产者(竞争关系,互斥关系)、消费者和消费者(竞争关系,互斥关系)、生产者和消费者(竞争关系(保证数据的正确性),同步关系(保证多线程协调))。
两种角色:生产者和消费者(特定的进程或线程)。
一个交易场所:通常指内存的一段缓冲区。
生产者消费者模型优点
ProCon1.hpp
#include <iostream> #include <queue> #include <pthread.h> #include <unistd.h> #include <cstdlib> #include <ctime> #define NUM 6 template <class T> class PC { public: PC(int cap = NUM) : _cap(cap) { // 初始化锁和条件变量 pthread_mutex_init(&lock, nullptr); pthread_cond_init(&_empty, nullptr); pthread_cond_init(&_full, nullptr); } void Push(const T &in) { // 先申请锁 pthread_mutex_lock(&lock); // 判断是否为满 while (full()) { // 如果是满了,那么就必须等待,并把锁释放出去 pthread_cond_wait(&_full, &lock); } // 插入数据 q.push(in); // 队列中已经有数据了,唤醒正在等待的消费者。 if (q.size() >= _cap / 2) { pthread_cond_signal(&_empty); std::cout << "消费者快来消费吧 "; } // 释放锁 pthread_mutex_unlock(&lock); } void Pop(T &out) { // 先申请锁 pthread_mutex_lock(&lock); // 判断队列是否为空 while (empty()) { // 如果为空,那么消费者等待,并释放锁 pthread_cond_wait(&_empty, &lock); } // 拿出队头的资源 out = q.front(); // 删除队头数据 q.pop(); // 唤醒正在等待的生产者 if (q.size() <= _cap / 2) { pthread_cond_signal(&_full); std::cout << "生产者快来生产吧 "; } // 释放锁 pthread_mutex_unlock(&lock); } ~PC() { _cap = 0; // 销毁锁和条件变量 pthread_mutex_destroy(&lock); pthread_cond_destroy(&_full); pthread_cond_destroy(&_empty); } private: // 判断队列是否为空 bool empty() { return q.empty(); } // 判断队列是否为满 bool full() { return q.size() == _cap; } private: std::queue<T> q; int _cap; // 定义锁 pthread_mutex_t lock; // 定义条件变量,_empty表示队列为空的条件变量,_full表示队列为满的条件变量 pthread_cond_t _empty; pthread_cond_t _full; };
main.cc
void *Pro(void *asg) { pthread_detach(pthread_self()); PC<int> *qc = (PC<int> *)asg; while (true) { sleep(1); int data = rand() % 100 + 1; qc->Push(data); std::cout << "生产者生产了:" << data << std::endl; } } void *Con(void *asg) { pthread_detach(pthread_self()); PC<int> *qc = (PC<int> *)asg; while (true) { sleep(2); int data = 0; qc->Pop(data); std::cout << "消费者消费了:" << data << std::endl; } } int main() { pthread_t producer, consumer; // 定义随机种子 srand((unsigned long)time(nullptr)); PC<int> *qc = new PC<int>(); pthread_create(&producer, nullptr, Pro, qc); pthread_create(&consumer, nullptr, Con, qc); while (true) ; return 0; }
生产者生产数据,消费者拿出数据并进行计算。
ProCon2.hpp,和上面的基本上是一样的
#pragma once #include <iostream> #include <queue> #include <pthread.h> #include <unistd.h> #include <cstdlib> #include <ctime> #define NUM 6 template <class T> class PC { public: PC(int cap = NUM) : _cap(cap) { pthread_mutex_init(&lock, nullptr); pthread_cond_init(&_empty, nullptr); pthread_cond_init(&_full, nullptr); } void Push(const T &in) { pthread_mutex_lock(&lock); while (full()) { pthread_cond_wait(&_full, &lock); } q.push(in); pthread_cond_signal(&_empty); pthread_mutex_unlock(&lock); } void Pop(T &out) { pthread_mutex_lock(&lock); while (empty()) { pthread_cond_wait(&_empty, &lock); } out = q.front(); q.pop(); pthread_cond_signal(&_full); pthread_mutex_unlock(&lock); } ~PC() { _cap = 0; pthread_mutex_destroy(&lock); pthread_cond_destroy(&_full); pthread_cond_destroy(&_empty); } private: bool empty() { return q.empty(); } bool full() { return q.size() == _cap; } private: std::queue<T> q; int _cap; pthread_mutex_t lock; pthread_cond_t _empty; pthread_cond_t _full; };
fun.hpp
#pragma once #include <iostream> class computer { public: computer(int _x, int _y, char _por) : x(_x), y(_y), por(_por) { } computer() {} int fun() { int result = 0; switch (por) { case '+': result = x + y; break; case '-': result = x - y; break; case '*': result = x * y; break; case '/': if (y == 0) { std::cout << "除0错误" << std::endl; return -1; } result = x / y; break; default: break; } return result; } public: int x; int y; char por; };
Test.cc
void *Pro(void *asg) { pthread_detach(pthread_self()); PC<computer> *qc = (PC<computer> *)asg; char *por = "+-*/"; while (true) { sleep(1); int x = rand() % 100 + 1; int y = rand() % 50; int p = rand() % 4; computer su(x, y, por[p]); qc->Push(su); } } void *Con(void *asg) { pthread_detach(pthread_self()); PC<computer> *qc = (PC<computer> *)asg; while (true) { sleep(1); computer su; qc->Pop(su); int data = su.fun(); std::cout << su.x << su.por << su.y << "=" << data << std::endl; } } int main() { pthread_t producer, consumer; srand((unsigned long)time(nullptr)); PC<computer> *qc = new PC<computer>(); pthread_create(&producer, nullptr, Pro, qc); pthread_create(&consumer, nullptr, Con, qc); while (true) ; return 0; }
在之前的抢票系统中,并没有对票这个全局变量进行管理。当我们想买其中的一号票时,我们就应该用信号量来管理。
在这个生产者消费者模型中的环形队列中要保证两个点:
Ring.hpp
#include <iostream> #include <pthread.h> #include <semaphore.h> #include <unistd.h> #include <vector> #include <stdint.h> #include <ctime> #define NUM 16 template <class T> class RingQueue { private: std::vector<T> q; int _cap; // 描述空间的信号量 sem_t balk_sem; // 描述数据的信号量 sem_t data_sem; // 消费者要消费的数据的下标 int c_pos; // 生产者要生产的数据的下标 int p_pos; private: void P(sem_t &sem) { sem_wait(&sem); } void V(sem_t &sem) { sem_post(&sem); } public: RingQueue(int cap = NUM) : _cap(cap), c_pos(0), p_pos(0) { q.resize(cap); // 初始化信号量,其中描述空间的信号量的大小设置为cap // 描述数据的信号量的大小设置为0 sem_init(&balk_sem, 0, cap); sem_init(&data_sem, 0, 0); } void Push(const T &in) { // 当生产了一个数据,要对balk_sem进行P操作,对data_sem进行V操作 P(balk_sem); q[p_pos] = in; p_pos++; // 取模,保证这个队列是循环队列 p_pos %= _cap; V(data_sem); } void Pop(T &out) { P(data_sem); out = q[c_pos]; c_pos++; c_pos %= _cap; V(balk_sem); } ~RingQueue() { sem_destroy(&balk_sem); sem_destroy(&data_sem); } };
RingTest.cc
#include "Ring.hpp" void *production(void *asg) { RingQueue<int> *q = (RingQueue<int> *)asg; while (true) { sleep(1); int data = rand() % 100 + 1; q->Push(data); std::cout << "生产者->:" << data << std::endl; } } void *consumption(void *asg) { RingQueue<int> *q = (RingQueue<int> *)asg; while (true) { sleep(2); int data; q->Pop(data); std::cout << "消费者->:" << data << std::endl; } } int main() { RingQueue<int> *q = new RingQueue<int>(); srand((unsigned long)time(nullptr)); pthread_t producer, consumer; pthread_create(&producer, nullptr, production, q); pthread_create(&consumer, nullptr, consumption, q); pthread_join(producer, nullptr); pthread_join(consumer, nullptr); return 0; }
因篇幅问题不能全部显示,请点此查看更多更全内容