并发编程(5)——条件变量、线程安全队列
2025-04-30 12:24:32 # C++ # 并发编程

五、day5

今天学习如何通过条件变量实现线程安全,其实条件变量在前面学习网络编程的时候就已经用到了,比如在实现并发服务器逻辑层中处理逻辑队列消息的时候,我们为了防止线程一直处于while循环中浪费资源,我们使用条件变量通知线程。如果没有通知,那么处理线程就暂时停止,如果队列中有消息,那么就提醒线程开始处理队列消息,线程被唤醒。

这也是我们今天学习的,往期关于条件变量的使用可以参考:

网络编程(19)——C++使用asio协程实现并发服务器 - 知乎

今天的学习内容包括:

1)什么是条件变量,如何使用条件变量;⭐⭐⭐⭐⭐

2)两种wait()重载函数的区别,如何通过while循环使第一种重载等效于第二种重载;⭐⭐⭐⭐

3)如果有多个变量需要判断,两种重载应如何使用;⭐⭐⭐

4)通过条件变量实现线程安全队列(生产-消费者模式)。⭐⭐⭐⭐⭐

参考:

恋恋风辰官方博客

ModernCpp-ConcurrentProgramming-Tutorial/md/04同步操作.md at main · Mq-b/ModernCpp-ConcurrentProgramming-Tutorial


1. 条件变量

虽然我们使用过条件变量,但是应该如何真正的理解它呢?我们这里举个例子进行说明。

假设我们正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?

  1. 一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。
  2. 可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。
  3. 事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。

这和线程有什么关系呢?其实第一种方法就是在说”忙等待(busy waiting)”也称“自旋“。也就是我们在文章一开始说的,线程一直处于while循环中来判断队列中是否有待处理的消息

1
2
3
4
5
6
7
8
9
10
bool flag = false;
std::mutex m;

void wait_for_flag(){
std::unique_lock<std::mutex> lk{ m };
while (!flag){
lk.unlock(); // 1 解锁互斥量,释放共享资源
lk.lock(); // 2 上锁互斥量
}
}

第二种方法就是加延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠②前函数对互斥量解锁①,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。

1
2
3
4
5
6
7
8
void wait_for_flag(){
std::unique_lock<std::mutex> lk{ m };
while (!flag){
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠
lk.lock(); // 3 上锁互斥量
}
}

第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。比如以甲、乙两线程的二级流水模式为例,若数据要先进行前期处理,然后才可以开始正式操作,那么线程甲则需等待线程乙完成并且触发事件之后,才可以执行相应的操作,这个过程就是使用了条件变量”。

1.1 如何使用条件变量

C++ 标准库对条件变量有两套实现:std::condition_variablestd::condition_variable_any,这两个实现都包含在<condition_variable> 头文件中。

condition_variable_any 类是 std::condition_variable 的泛化。二者的使用都需要配合互斥进行使用,但是相对于只在std::unique_lockstd::mutex上工作的std::condition_variablecondition_variable_any 能在任何满足可基本锁定(BasicLockable)要求的锁上工作,所以增加了 _any 后缀。显而易见,这种区分必然是 any 版更加通用但是却有更多的性能开销。所以通常首选 std::condition_variable。有特殊需求,才会考虑后者。

为什么std::condition_variable 的 wait 方法只能与 std::unique_lock<std::mutex> 配合使用,而不能与 std::lock_guard<std::mutex> 一起使用?

  • 锁的管理:

    • std::unique_lock 提供了更灵活的锁管理功能,可以在等待条件时释放锁并在条件满足后重新获取锁。
    • std::lock_guard 是一个简单的 RAII(资源获取即初始化)封装,用于自动管理互斥锁的获取和释放,但它不支持在持有锁的状态下进行锁的释放。
  • 条件等待机制:

    • std::condition_variable::wait 方法会在等待期间释放锁,并在条件满足时重新获取锁。只有 std::unique_lock 能够在等待时有效地管理这个过程。
    • std::lock_guard 无法在持有锁的情况下释放锁,这样就无法在条件变量等待期间进行其他线程的操作,导致无法实现正确的等待机制。
  • 灵活性:

    • std::unique_lock 可以在等待条件时根据需要手动控制锁的状态,例如延迟锁定或条件变化时的重新锁定,而 std::lock_guard 的使用较为简单,无法实现这种灵活性。

1.1.1 使用条件变量实现方法3

那么我们可以使用条件变量实现上面的方法3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::mutex mtx;
std::condition_variable cv;
bool arrived = false;

void wait_for_arrival() {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [] { return arrived; }); // 等待 arrived 变为 true
std::cout << "到达目的地,可以下车了!" << std::endl;
}

void simulate_arrival() {
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟地铁到站,假设5秒后到达目的地
{
std::lock_guard<std::mutex> lck(mtx);
arrived = true; // 设置条件变量为 true,表示到达目的地
}
cv.notify_one(); // 通知等待的线程
}

int main() {
std::thread t1(wait_for_arrival);
std::thread t2(simulate_arrival);

t1.join();
t2.join();
}

等待到达的函数:

1
2
3
4
5
void wait_for_arrival() {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [] { return arrived; }); // 等待 arrived 变为 true
std::cout << "到达目的地,可以下车了!" << std::endl;
}
  • wait_for_arrival 函数首先创建一个 unique_lock 对象 lck,锁住互斥锁 mtx
  • cv.wait(lck, [] { return arrived; }); 会释放锁并等待,直到 arrived 变为 true。当条件满足时,锁会重新被获取
  • 条件满足后,输出“到达目的地,可以下车了!”

注意,这里的return不是针对函数wait_for_arrival返回bool变量arrived,可别误会了,这里是wait的重载版本

模拟到达的函数:

1
2
3
4
5
6
7
8
void simulate_arrival() {
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟地铁到站,假设5秒后到达目的地
{
std::lock_guard<std::mutex> lck(mtx);
arrived = true; // 设置条件变量为 true,表示到达目的地
}
cv.notify_one(); // 通知等待的线程
}
  • simulate_arrival 函数使用 std::this_thread::sleep_for 模拟地铁在 5 秒后到达目的地
  • 当模拟完成后,使用 lock_guard 锁住 mtx,将 arrived 设置为 true,表示到达
  • cv.notify_one(); 通知等待的线程,唤醒一个在 wait_for_arrival 中等待的线程

1.1.2 wait() 的重载

  • 当线程调用 cv.wait(lock, predicate) 时,如果 predicatefalse,该线程会释放传入的锁,然后进入阻塞状态,等待其他线程发出通知。所以并不是唤醒线程后才会检查,而是调用wait函数后会立即检查判断条件,然后每次被唤醒再次检查
  • 被唤醒的线程在成功获取锁后,会再次检查条件(即 predicate)。如果条件已经满足(true),则线程继续执行;如果条件仍然不满足,则它会重新进入阻塞等待状态。

函数原型为:

1
2
3
4
5
6
void wait(unique_lock<mutex>& lock);
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

condition_variable::wait(std::unique_lock<std::mutex>& lock);
condition_variable::wait(std::unique_lock<std::mutex>& lock, []{ return ready; });

两种wait函数的实现如下:

1
2
3
4
5
6
7
8
9
10
void wait(unique_lock<mutex>& _Lck) noexcept {
_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}

template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) {
while (!_Pred()) {
wait(_Lck);
}
}

wait函数有两个重载:

  • 第一个重载的功能是:释放锁并等待,直到其他线程调用 notify_one() 或 notify_all();
  • 第二个重载的功能是:在释放锁后,等待直到 pred() 返回 true。此函数会在被唤醒后重新获得锁,并检查 pred() 的返回值

第二个版本是对第一个版本的包装,等待并判断谓词,会调用第一个版本的重载。这可以避免“虚假唤醒“。

条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。

wait函数的作用

1)wait会释放与 lock 关联的互斥量(std::mutex),允许其他线程访问共享资源;

2)进入等待状态,直到被其他线程唤醒;

3)一旦被唤醒,wait() 方法会重新获取锁,并返回到调用点

线程在等待期间会释放传入wait的锁,允许其他线程访问;当线程被唤醒后,会重新获得锁。

img

第一种原型步骤

img

第二种原型步骤

1.1.3 wait的无谓词重载和while循环配合使用

wait的第二种重载可以防止”虚假唤醒”,那么第一种重载就不可以了吗?

其实我们也可以通过第一种重载实现防止“虚假唤醒”的功能,通过增加一个while循环配合使用第一个重载版本进行使用:

1
2
3
while (!_b_stop) {
_consume.wait(unique_lk);
}

这样,我们手动增加一个bool变量进行判断,我们就可以让第一个重载版本等效于第二个重载版本。

1.1.4 多个判断变量

若有多个变量需要判断呢?以下是两种重载的实现方法。

第一种重载的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void wait_for_conditions() {
std::unique_lock<std::mutex> lck(mtx);
// 使用循环检查多个条件
while (!arrived || !isReady) {
cv.wait(lck); // 等待通知
}
std::cout << "到达目的地并准备好了,可以下车了!" << std::endl;
}

void simulate_arrival() {
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟到站
{
std::lock_guard<std::mutex> lck(mtx);
arrived = true;
}
cv.notify_all(); // 通知所有等待的线程
}

void prepare() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟准备时间
{
std::lock_guard<std::mutex> lck(mtx);
isReady = true;
}
cv.notify_all(); // 通知所有等待的线程
}

第二种重载的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void wait_for_conditions() {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [] { return arrived && isReady; }); // 等待两个条件都满足
std::cout << "到达目的地并准备好了,可以下车了!" << std::endl;
}

void simulate_arrival() {
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟到站
{
std::lock_guard<std::mutex> lck(mtx);
arrived = true;
}
cv.notify_one(); // 通知等待的线程
}

void prepare() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟准备时间
{
std::lock_guard<std::mutex> lck(mtx);
isReady = true;
}
cv.notify_one(); // 通知等待的线程
}

函数wait_for_conditionssimulate_arrivalprepare各需要一个线程来启动:

1
2
3
std::thread t1(wait_for_conditions);
std::thread t2(simulate_arrival);
std::thread t3(prepare);

需要注意的是,两种重载在条件检查逻辑上是不同的!!

第一种重载:

1
2
3
while (!arrived || !isReady) {
cv.wait(lck); // 等待通知
}
  • 逻辑: 在这个重载中,wait 只接受一个锁,并且需要在调用 wait 之前先检查条件。

  • 检查条件的方式:

    • 使用 while (!arrived || !isReady) 来确保在调用 wait 之前,只有在条件不满足时才会进入等待状态。
    • 这种方式确保在条件未满足时,线程会释放锁并等待通知。
    • 如果在被唤醒后条件仍然不满足,循环会再次检查并继续等待,直到所有条件都满足。

第二种重载:

1
cv.wait(lck, [] { return arrived && isReady; });
  • 逻辑: 这个重载允许你直接传递一个条件检查的 Lambda 表达式。
  • 检查条件的方式:
    • 使用 return arrived && isReady; 作为条件,当该表达式返回 true 时,wait 会停止等待。
    • 这个 Lambda 会在每次被唤醒时被调用,用于检查当前条件是否满足。

1.2 使用示例

假设有两个线程,线程A持续输出1,而线程B持续输出2。如果想实现两个线程交替输出1和2(即输出顺序为1、2、1、2……),可以考虑以下方法。

我们可以使用互斥量(mutex)来解决这个问题。可以定义一个全局变量num来表示当前应该由哪个线程输出:当num为1时,线程A输出1;当num为2时,线程B输出2。通过互斥量控制两个线程对num的访问。如果某个线程发现num与自己不匹配(例如,线程A发现num为2),则可以解锁并将控制权交给另一个线程,同时让自己进入休眠状态,这样就可以实现交替输出的效果。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void PoorImpleman() {
std::thread t1([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
std::thread t2([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
t1.join();
t2.join();
}

PoorImpleman 虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。

所以我们可以使用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
cvA.wait(lock, []() {
return num == 1;
});
num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one();
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
cvB.wait(lock, []() {
return num == 2;
});
num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one();
}
});
t1.join();
t2.join();
}

当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。

这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,第一种sleep的方式日志基本是每隔1秒才打印一次,效率很差。

2. 使用条件变量实现线程安全的队列

之前我们实现过线程安全的栈,对于pop操作,从栈顶取出元素,然后pop。但因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方法是:栈在pop的时候并不像之前直接将元素取出,在线程中先调用empty判断栈是否为空,如果不为空,则pop;反之,抛出一个我们自定义的异常,表示栈为空,但仍然调用pop。

但这可能引发另外一个问题:当程序执行pop函数时,栈不为空,栈顶层数据成功被弹出,但假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够。 函数返回element时,就会就会存在数据做拷贝赋值时造成失败(内存不够用)。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。参考:

爱吃土豆:并发编程(3)——锁(上)3 赞同 · 0 评论文章imghttps://zhuanlan.zhihu.com/p/3963032805

这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。

我们如何通过条件变量实现一个线程安全的队列呢?需要考虑以下两个关键点

  1. 当执行 push 操作时,需要确保没有其他线程正在执行 push 或 pop 操作;同样,在执行 pop 操作时,也需要确保没有其他线程正在执行 push 或 pop 操作。
  2. 当队列为空时,不应该执行 pop 操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop 操作时队列不为空。

基于以上思考,我们设计了一个名为 threadsafe_queue 的模板类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
threadsafe_queue(const threadsafe_queue& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
// 返回引用
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
// 返回智能指针
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// 返回引用
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
// 返回智能指针
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

我们可以启动三个线程,一个 producer 线程用来向队列中放入数据。一个 consumer1 线程用来阻塞等待pop队列中的元素。另一个 consumer2 尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。

打印时为了保证线程输出在屏幕上不会乱序,所以加锁保证互斥输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print;
std::thread producer(
[&]() {
for (int i = 0; ;i++) {
safe_que.push(i);
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);
std::thread consumer1(
[&]() {
for (;;) {
auto data = safe_que.wait_and_pop();
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << *data << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
std::thread consumer2(
[&]() {
for (;;) {
auto data = safe_que.try_pop();
if (data != nullptr) {
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << *data << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
producer.join();
consumer1.join();
consumer2.join();
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
producer push data is 0
consumerl wait and pop data is 0
producer push data is l
producer push data is 2
consumer2 trypop data is 1
consumerl wait and pop data is 2
producer push data is 3
producer push data is 4
producer push data is 5
consumer2 try pop data is 3
consumerl wait and pop data is 4
producer push data is 6
producer push data is 7
consumer2 try pop data is 5
consumerl wait and pop data is 6
producer push data is 8
producer push data is 9
producer push data is 10
consumerl wait and pop data is 7
consumer2 try pop data is 8
producer push data is 11
producer push data is 12
consumer2 try pop data is 9
consumerl wait and pop data is 10
producer push data is 13
producer push data is 14

生产者每次只0睡眠200ms,两个消费者每次睡眠500ms,所以生产者生产的速度大于消费者消费的速度,我们可以看到生产者生产的数据都会被消费者按顺序pop出(0->1->2->3->4……)。但consumerl 和consumer2 是争抢资源的,所以consumer2 和consumer1的输出顺序不同,有时候是consumer1先,有时候是consumer2 先。