五、day5 今天学习如何通过条件变量实现线程安全,其实条件变量在前面学习网络编程的时候就已经用到了,比如在实现并发服务器逻辑层中处理逻辑队列消息的时候,我们为了防止线程一直处于while循环中浪费资源,我们使用条件变量通知线程。如果没有通知,那么处理线程就暂时停止,如果队列中有消息,那么就提醒线程开始处理队列消息,线程被唤醒。
这也是我们今天学习的,往期关于条件变量的使用可以参考:
网络编程(19)——C++使用asio协程实现并发服务器 - 知乎
今天的学习内容包括:
1)什么是条件变量,如何使用条件变量;⭐⭐⭐⭐⭐
2)两种wait()重载函数的区别,如何通过while循环使第一种重载等效于第二种重载;⭐⭐⭐⭐
3)如果有多个变量需要判断,两种重载应如何使用;⭐⭐⭐
4)通过条件变量实现线程安全队列(生产-消费者模式)。⭐⭐⭐⭐⭐
参考:
恋恋风辰官方博客
ModernCpp-ConcurrentProgramming-Tutorial/md/04同步操作.md at main · Mq-b/ModernCpp-ConcurrentProgramming-Tutorial
1. 条件变量
虽然我们使用过条件变量,但是应该如何真正的理解它呢?我们这里举个例子进行说明。
假设我们正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?
一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。
可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。
事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。
这和线程有什么关系呢?其实第一种方法就是在说”忙等待 (busy waiting)”也称“自旋 “。也就是我们在文章一开始说的,线程一直处于while循环中来判断队列中是否有待处理的消息 。
1 2 3 4 5 6 7 8 9 10 bool flag = false ;std::mutex m; void wait_for_flag () { std::unique_lock<std::mutex> lk{ m }; while (!flag){ lk.unlock (); lk.lock (); } }
第二种方法就是加延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠②前函数对互斥量解锁①,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。
1 2 3 4 5 6 7 8 void wait_for_flag () { std::unique_lock<std::mutex> lk{ m }; while (!flag){ lk.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); lk.lock (); } }
第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。比如以甲、乙两线程的二级流水模式为例,若数据要先进行前期处理,然后才可以开始正式操作,那么线程甲则需等待线程乙完成并且触发事件之后,才可以执行相应的操作,这个过程就是使用了条件变量”。
1.1 如何使用条件变量 C++ 标准库对条件变量有两套实现:std::condition_variable
和 std::condition_variable_any
,这两个实现都包含在<condition_variable>
头文件中。
condition_variable_any
类是 std::condition_variable
的泛化。二者的使用都需要配合互斥进行使用,但是相对于只在std::unique_lockstd::mutex
上工作的std::condition_variable
,condition_variable_any
能在任何满足可基本锁定(BasicLockable)要求的锁上工作,所以增加了 _any 后缀。显而易见,这种区分必然是 any 版更加通用但是却有更多的性能开销。所以通常首选 std::condition_variable
。有特殊需求,才会考虑后者。
为什么std::condition_variable
的 wait 方法只能与 std::unique_lock<std::mutex>
配合使用,而不能与 std::lock_guard<std::mutex>
一起使用?
锁的管理:
std::unique_lock
提供了更灵活的锁管理功能,可以在等待条件时释放锁并在条件满足后重新获取锁。
std::lock_guard
是一个简单的 RAII(资源获取即初始化)封装,用于自动管理互斥锁的获取和释放,但它不支持在持有锁的状态下进行锁的释放。
条件等待机制:
std::condition_variable::wait
方法会在等待期间释放锁,并在条件满足时重新获取锁。只有 std::unique_lock
能够在等待时有效地管理这个过程。
std::lock_guard
无法在持有锁的情况下释放锁,这样就无法在条件变量等待期间进行其他线程的操作,导致无法实现正确的等待机制。
灵活性:
std::unique_lock
可以在等待条件时根据需要手动控制锁的状态,例如延迟锁定或条件变化时的重新锁定,而 std::lock_guard
的使用较为简单,无法实现这种灵活性。
1.1.1 使用条件变量实现方法3 那么我们可以使用条件变量实现上面的方法3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 std::mutex mtx; std::condition_variable cv; bool arrived = false ;void wait_for_arrival () { std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [] { return arrived; }); std::cout << "到达目的地,可以下车了!" << std::endl; } void simulate_arrival () { std::this_thread::sleep_for (std::chrono::seconds (5 )); { std::lock_guard<std::mutex> lck (mtx) ; arrived = true ; } cv.notify_one (); } int main () { std::thread t1 (wait_for_arrival) ; std::thread t2 (simulate_arrival) ; t1. join (); t2. join (); }
等待到达的函数:
1 2 3 4 5 void wait_for_arrival () { std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [] { return arrived; }); std::cout << "到达目的地,可以下车了!" << std::endl; }
wait_for_arrival
函数首先创建一个 unique_lock
对象 lck,锁住互斥锁 mtx
cv.wait(lck, [] { return arrived; });
会释放锁并等待,直到 arrived 变为 true。当条件满足时,锁会重新被获取
条件满足后,输出“到达目的地,可以下车了!”
注意,这里的return不是针对函数wait_for_arrival返回bool变量arrived,可别误会了,这里是wait的重载版本
模拟到达的函数:
1 2 3 4 5 6 7 8 void simulate_arrival () { std::this_thread::sleep_for (std::chrono::seconds (5 )); { std::lock_guard<std::mutex> lck (mtx) ; arrived = true ; } cv.notify_one (); }
simulate_arrival 函数使用 std::this_thread::sleep_for
模拟地铁在 5 秒后到达目的地
当模拟完成后,使用 lock_guard 锁住 mtx,将 arrived 设置为 true,表示到达
cv.notify_one(); 通知等待的线程,唤醒一个在 wait_for_arrival 中等待的线程
1.1.2 wait() 的重载
当线程调用 cv.wait(lock, predicate)
时,如果 predicate
为 false
,该线程会释放传入的锁,然后进入阻塞状态,等待其他线程发出通知。所以并不是唤醒线程后才会检查,而是调用wait
函数后会立即检查判断条件,然后每次被唤醒再次检查 。
被唤醒的线程在成功获取锁后,会再次检查条件(即 predicate
)。如果条件已经满足(true
),则线程继续执行;如果条件仍然不满足,则它会重新进入阻塞等待状态。
函数原型为:
1 2 3 4 5 6 void wait (unique_lock<mutex>& lock) ;template <class Predicate >void wait (std::unique_lock<std::mutex>& lock, Predicate pred) ;condition_variable::wait (std::unique_lock<std::mutex>& lock); condition_variable::wait (std::unique_lock<std::mutex>& lock, []{ return ready; });
两种wait函数的实现如下:
1 2 3 4 5 6 7 8 9 10 void wait (unique_lock<mutex>& _Lck) noexcept { _Cnd_wait(_Mycnd(), _Lck.mutex ()->_Mymtx()); } template <class _Predicate >void wait (unique_lock<mutex>& _Lck, _Predicate _Pred) { while (!_Pred()) { wait (_Lck); } }
wait函数有两个重载:
第一个重载的功能是:释放锁 并等待,直到其他线程调用 notify_one() 或 notify_all();
第二个重载的功能是:在释放锁 后,等待直到 pred() 返回 true。此函数会在被唤醒后重新获得锁,并检查 pred() 的返回值
第二个版本是对第一个版本的包装 ,等待并判断谓词,会调用第一个版本的重载。这可以避免“虚假唤醒 “。
条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。
wait函数的作用 :
1)wait会释放与 lock 关联的互斥量(std::mutex),允许其他线程访问共享资源;
2)进入等待状态,直到被其他线程唤醒;
3)一旦被唤醒,wait() 方法会重新获取锁,并返回到调用点
线程在等待期间会释放传入wait的锁,允许其他线程访问;当线程被唤醒后,会重新获得锁。
第一种原型步骤
第二种原型步骤
1.1.3 wait的无谓词重载和while循环配合使用
wait的第二种重载可以防止”虚假唤醒”,那么第一种重载就不可以了吗?
其实我们也可以通过第一种重载实现防止“虚假唤醒”的功能,通过增加一个while循环配合使用第一个重载版本进行使用:
1 2 3 while (!_b_stop) { _consume.wait (unique_lk); }
这样,我们手动增加一个bool变量进行判断,我们就可以让第一个重载版本等效于第二个重载版本。
1.1.4 多个判断变量
若有多个变量需要判断呢?以下是两种重载的实现方法。
第一种重载的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void wait_for_conditions () { std::unique_lock<std::mutex> lck (mtx) ; while (!arrived || !isReady) { cv.wait (lck); } std::cout << "到达目的地并准备好了,可以下车了!" << std::endl; } void simulate_arrival () { std::this_thread::sleep_for (std::chrono::seconds (5 )); { std::lock_guard<std::mutex> lck (mtx) ; arrived = true ; } cv.notify_all (); } void prepare () { std::this_thread::sleep_for (std::chrono::seconds (2 )); { std::lock_guard<std::mutex> lck (mtx) ; isReady = true ; } cv.notify_all (); }
第二种重载的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void wait_for_conditions () { std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [] { return arrived && isReady; }); std::cout << "到达目的地并准备好了,可以下车了!" << std::endl; } void simulate_arrival () { std::this_thread::sleep_for (std::chrono::seconds (5 )); { std::lock_guard<std::mutex> lck (mtx) ; arrived = true ; } cv.notify_one (); } void prepare () { std::this_thread::sleep_for (std::chrono::seconds (2 )); { std::lock_guard<std::mutex> lck (mtx) ; isReady = true ; } cv.notify_one (); }
函数wait_for_conditions
、simulate_arrival
、prepare
各需要一个线程来启动:
1 2 3 std::thread t1 (wait_for_conditions) ;std::thread t2 (simulate_arrival) ;std::thread t3 (prepare) ;
需要注意的是,两种重载在条件检查逻辑上是不同的!!
第一种重载:
1 2 3 while (!arrived || !isReady) { cv.wait (lck); }
第二种重载:
1 cv.wait (lck, [] { return arrived && isReady; });
逻辑 : 这个重载允许你直接传递一个条件检查的 Lambda 表达式。
检查条件的方式:
使用 return arrived && isReady;
作为条件,当该表达式返回 true 时,wait 会停止等待。
这个 Lambda 会在每次被唤醒时被调用,用于检查当前条件是否满足。
1.2 使用示例 假设有两个线程,线程A持续输出1,而线程B持续输出2。如果想实现两个线程交替输出1和2(即输出顺序为1、2、1、2……),可以考虑以下方法。
我们可以使用互斥量(mutex)来解决这个问题。可以定义一个全局变量num
来表示当前应该由哪个线程输出:当num
为1时,线程A输出1;当num
为2时,线程B输出2。通过互斥量控制两个线程对num
的访问。如果某个线程发现num
与自己不匹配(例如,线程A发现num
为2),则可以解锁并将控制权交给另一个线程,同时让自己进入休眠状态,这样就可以实现交替输出的效果。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void PoorImpleman () { std::thread t1 ([]() { for (;;) { { std::lock_guard<std::mutex> lock(mtx_num); if (num == 1 ) { std::cout << "thread A print 1....." << std::endl; num++; continue ; } } std::this_thread::sleep_for(std::chrono::milliseconds(500 )); } }) ; std::thread t2 ([]() { for (;;) { { std::lock_guard<std::mutex> lock(mtx_num); if (num == 2 ) { std::cout << "thread B print 2....." << std::endl; num--; continue ; } } std::this_thread::sleep_for(std::chrono::milliseconds(500 )); } }) ; t1. join (); t2. join (); }
PoorImpleman 虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠 ,是对资源的浪费,也错过了最佳的处理时机。
所以我们可以使用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void ResonableImplemention () { std::thread t1 ([]() { for (;;) { std::unique_lock<std::mutex> lock(mtx_num); cvA.wait(lock, []() { return num == 1 ; }); num++; std::cout << "thread A print 1....." << std::endl; cvB.notify_one(); } }) ; std::thread t2 ([]() { for (;;) { std::unique_lock<std::mutex> lock(mtx_num); cvB.wait(lock, []() { return num == 2 ; }); num--; std::cout << "thread B print 2....." << std::endl; cvA.notify_one(); } }) ; t1. join (); t2. join (); }
当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。
这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,第一种sleep的方式日志基本是每隔1秒才打印一次,效率很差。
2. 使用条件变量实现线程安全的队列 之前我们实现过线程安全的栈,对于pop操作,从栈顶取出元素,然后pop。但因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方法是:栈在pop的时候并不像之前直接将元素取出,在线程中先调用empty判断栈是否为空,如果不为空,则pop;反之,抛出一个我们自定义的异常,表示栈为空,但仍然调用pop。
但这可能引发另外一个问题:当程序执行pop函数时,栈不为空,栈顶层数据成功被弹出,但假设此时程序内存暴增 ,导致当程序使用的内存足够大时,可用的有效空间不够。 函数返回element时,就会就会存在数据做拷贝赋值时造成失败(内存不够用)。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。参考:
爱吃土豆:并发编程(3)——锁(上)3 赞同 · 0 评论文章 https://zhuanlan.zhihu.com/p/3963032805
这么做并不是很友好,所以我们可以通过条件变量 完善之前的程序,不过这次我们重新实现一个线程安全队列。
我们如何通过条件变量实现一个线程安全的队列呢?需要考虑以下两个关键点
当执行 push 操作时,需要确保没有其他线程正在执行 push 或 pop 操作;同样,在执行 pop 操作时,也需要确保没有其他线程正在执行 push 或 pop 操作。
当队列为空时,不应该执行 pop 操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop 操作时队列不为空。
基于以上思考,我们设计了一个名为 threadsafe_queue 的模板类,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 #include <queue> #include <mutex> #include <condition_variable> template <typename T>class threadsafe_queue { private : mutable std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public : threadsafe_queue (){} threadsafe_queue (const threadsafe_queue& other) { std::lock_guard<std::mutex> lk (other.mut) ; data_queue = other.data_queue; } void push (T new_value) { std::lock_guard<std::mutex> lk (mut) ; data_queue.push (new_value); data_cond.notify_one (); } void wait_and_pop (T& value) { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] {return !data_queue.empty (); }); value = data_queue.front (); data_queue.pop (); } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait (lk, [this ] {return !data_queue.empty (); }); std::shared_ptr<T> res (std::make_shared<T>(data_queue.front())) ; data_queue.pop (); return res; } bool try_pop (T& value) { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return false ; value = data_queue.front (); data_queue.pop (); return true ; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lk (mut) ; if (data_queue.empty ()) return std::shared_ptr <T>(); std::shared_ptr<T> res (std::make_shared<T>(data_queue.front())) ; data_queue.pop (); return res; } bool empty () const { std::lock_guard<std::mutex> lk (mut) ; return data_queue.empty (); } };
我们可以启动三个线程,一个 producer 线程用来向队列中放入数据。一个 consumer1 线程用来阻塞等待pop队列中的元素。另一个 consumer2 尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。
打印时为了保证线程输出在屏幕上不会乱序,所以加锁保证互斥输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void test_safe_que () { threadsafe_queue<int > safe_que; std::mutex mtx_print; std::thread producer ( [&]() { for (int i = 0 ; ;i++) { safe_que.push(i); { std::lock_guard<std::mutex> printlk(mtx_print); std::cout << "producer push data is " << i << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(200 )); } } ) ; std::thread consumer1 ( [&]() { for (;;) { auto data = safe_que.wait_and_pop(); { std::lock_guard<std::mutex> printlk(mtx_print); std::cout << "consumer1 wait and pop data is " << *data << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(500 )); } } ) ; std::thread consumer2 ( [&]() { for (;;) { auto data = safe_que.try_pop(); if (data != nullptr ) { { std::lock_guard<std::mutex> printlk(mtx_print); std::cout << "consumer2 try_pop data is " << *data << std::endl; } } std::this_thread::sleep_for(std::chrono::milliseconds(500 )); } } ) ; producer.join (); consumer1. join (); consumer2. join (); }
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 producer push data is 0 consumerl wait and pop data is 0 producer push data is l producer push data is 2 consumer2 trypop data is 1 consumerl wait and pop data is 2 producer push data is 3 producer push data is 4 producer push data is 5 consumer2 try pop data is 3 consumerl wait and pop data is 4 producer push data is 6 producer push data is 7 consumer2 try pop data is 5 consumerl wait and pop data is 6 producer push data is 8 producer push data is 9 producer push data is 10 consumerl wait and pop data is 7 consumer2 try pop data is 8 producer push data is 11 producer push data is 12 consumer2 try pop data is 9 consumerl wait and pop data is 10 producer push data is 13 producer push data is 14
生产者每次只0睡眠200ms,两个消费者每次睡眠500ms,所以生产者生产的速度大于消费者消费的速度,我们可以看到生产者生产的数据都会被消费者按顺序pop出(0->1->2->3->4……)。但consumerl 和consumer2 是争抢资源的,所以consumer2 和consumer1的输出顺序不同,有时候是consumer1先,有时候是consumer2 先。