并发编程(3)——锁(上)
2025-04-30 12:24:32 # C++ # 并发编程

三、day3

昨天在使用多线程并发的一个例子中出现了输出乱序的问题,这是正常,因为如果多线程执行没有互斥量,那么多线程执行就是无序且操作可能被打断的。这也是今天学习的,如何通过通过锁保护共享数据。主要内容为:

1)什么是条件竞争⭐

2)介绍互斥量mutex和几种上锁“工具”⭐⭐⭐⭐⭐

3)即使我们加了锁,但是共享数据仍有可能会泄漏。介绍了两种数据泄漏可能的方式,以及处理方法⭐⭐⭐

4)什么是死锁以及死锁的处理方法。着重学习层级加锁的方法,我通过一个银行转账的例子解析了层级加锁的工作流程:就是保证不同线程能够按照相同的加锁顺序对资源进行加锁,比如线程1按照A->B的方式进行加锁,那么后来的所有线程的加锁顺序都只会是A->B,而不是B->A。⭐⭐⭐⭐⭐

参考:

ModernCpp-ConcurrentProgramming-Tutorial/md/03共享数据.md at main · Mq-b/ModernCpp-ConcurrentProgramming-Tutorial

恋恋风辰官方博客

C++ 并发编程(3) 互斥和死锁_哔哩哔哩_bilibili


1. 条件竞争

在多线程的情况下,每个线程都抢着完成自己的任务。在大多数情况下,即使会改变执行顺序,也是良性竞争,这是无所谓的。比如两个线程都要往标准输出输出一段字符,谁先谁后并不会有什么太大影响。

1
2
3
4
5
6
7
8
9
void f1() { std::cout << "1\n"; }
void f2() { std::cout << "2\n"; }

int main(){
std::thread t{ f1 };
std::thread t2{ f2 };
t.join();
t2.join();
}

注意:std::coutoperator<< 调用是线程安全的,不会被打断。即:同步的 C++ 流保证是线程安全的(从多个线程输出的单独字符可能交错,但无数据竞争)

只有在涉及多线程读写相同共享数据的时候,才会导致“恶性的条件竞争”。

1
2
3
4
5
6
7
8
9
10
11
12
std::vector<int>v;

void f() { v.emplace_back(1); }
void f2() { v.erase(v.begin()); }

int main() {
std::thread t{ f };
std::thread t2{ f2 };
t.join();
t2.join();
std::cout << v.size() << '\n';
}

比如上面这段代码就是典型的恶性条件竞争,两个线程共享一个 vector,并对它进行修改。可能导致许多问题,比如 f2 先执行,此时 vector 还没有元素,导致抛出异常。又或者 f 执行了一半,调用了 f2(),等等。

当然了,也有可能先执行 f,然后执行 f2,最后打印了 0,程序老老实实执行完毕。

而且即使不是一个添加元素,一个删除元素,全是emplace_back添加元素,也一样会有问题,由于 std::vector 不是线程安全的容器,因此当多个线程同时访问并修改 v 时,可能会发生未定义的行为。具体来说,当两个线程同时尝试向 v 中添加元素时,但是 emplace_back 函数却是可以被打断的,执行了一半,又去执行另一个线程。可能会导致数据竞争,从而引发未定义的结果。

当某个表达式的求值写入某个内存位置,而另一求值读或修改同一内存位置时,称这些表达式冲突拥有两个冲突的求值的程序就有数据竞争,除非

  • 两个求值都在同一线程上,或者在同一信号处理函数中执行,或
  • 两个冲突的求值都是原子操作(见 std::atomic),或
  • 一个冲突的求值发生早于 另一个(见 std::memory_order)

如果出现数据竞争,那么程序的行为未定义。

标量类型等都同理,有数据竞争,**未定义行为**:

1
2
3
int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // 未定义行为

2. 锁

互斥量(Mutex),又常被称为互斥锁、互斥体(或者直接被称作“锁”),是一种用来保护临界区的特殊对象,其相当于实现了一个公共的“标志位”。它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:

  1. 如果互斥量是锁定的,通常说某个特定的线程正持有这个锁。
  2. 如果没有线程持有这个互斥量,那么这个互斥量就处于解锁状态。

2.1 mutex

我们可以通过mutex对共享数据上锁,防止多线程访问共享区造成未定义行为问题。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
std::mutex  mtx1;
int shared_data = 100;
void use_lock() {
while (true) {
mtx1.lock();
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
// 当执行一次循环后,当前线程先睡一会,防止继续循环一直占用CPU资源
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
void test_lock() {
std::thread t1(use_lock);
std::thread t2([]() {
while (true) {
mtx1.lock();
shared_data--;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});
t1.join();
t2.join();
}

初始化一个共享变量shared_data,然后定义一个互斥量std::mutex,接下来启动两个线程,分别执行use_lock增加数据,和一个lambda表达式减少数据。

当执行一次循环后,使当前线程先睡一会,防止继续循环导致一直占用CPU资源

简而言之,被lock()和unlock()包含在其中的代码是线程安全的,同一时间只有一个线程执行,不会被其它线程的执行所打断。但是当解完锁之后,哪个线程会被继续调用,这个是由操作系统调度决定的。由下面的输出结果可以看出来,即使我们在每个线程执行一次循环后睡眠十毫秒,但是有时候系统的调度仍然会让这个线程继续运行,所以完全有可能连续 4 次都是执行线程t1lock,连续减shared_data 四次,这都很正常。所以mutex仅仅只是能确定代码块是线程安全的,并不能保证线程调用是顺序的。

img

如果不手动解锁,那么当代码运行到‘}’结束的时候也不会自动解锁,*std::mutex 本身并不具备自动解锁的特性,解锁操作是通过与其配合使用的锁类来实现的。

2.2 lock_guard

我们也可以使用lock_guard进行自动加解锁,也就是之前说的RAII技术,当lock_guard被实例化的时候进行加锁,当lock_guard被析构的时候进行解锁。

1
2
3
4
5
6
7
8
9
void use_lock() {
while (true) {
std::lock_guard<std::mutex> lock(mtx1);
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}

lock_guard作用域结束时自动调用其析构函数解锁,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开。

lock_guard 也是一个“管理类”模板,用来管理互斥量的上锁与解锁,可以看一下 lock_guard 的源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
_EXPORT_STD template <class _Mutex>
class _NODISCARD_LOCK lock_guard { // class with destructor that unlocks a mutex
public:
using mutex_type = _Mutex;

explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
_MyMutex.lock();
}

lock_guard(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
: _MyMutex(_Mtx) {} // construct but don't lock

~lock_guard() noexcept {
_MyMutex.unlock();
}

lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;

private:
_Mutex& _MyMutex;
};

首先lock_guard 作为管理类,要求不可复制,我们定义复制构造与复制赋值为弃置函数。

它只保有一个私有数据成员,一个引用,用来引用互斥量。构造函数中初始化这个引用,同时上锁析构函数中解锁,这是一个非常典型的 RAII 式的管理。

同时它还提供一个有额外std::adopt_lock_t参数的构造函数 ,如果使用这个构造函数,则构造函数不会上锁。 adopt_lock 表示这个互斥量的锁已经在其他地方被获取,这样,lock_guard 会在构造时不再调用 lock() 方法,而是直接采用已锁定的状态。这个功能在某些情况下非常有用,尤其是在希望将一个已经被锁定的互斥量传递给 lock_guard 的时候。比如:

1
2
3
std::mutex mtx;
mtx.lock(); // 手动锁定互斥量
lock_guard<std::mutex> lg(mtx, std::adopt_lock); // 采用已锁定的互斥量

我们一般使用 lock_guard 时,经常使用下面的形式:

1
2
3
4
5
6
7
8
9
std::mutex mtx;
void f(){
//code..
{
std::lock_guard<std::mutex> lc{ mtx };
// 涉及共享资源的修改的代码...
}
//code..
}

使用{}创建了一个块作用域,限制了对象 lc 的生存期,进入作用域构造 lock_guard 的时候上锁(lock),离开作用域析构的时候解锁。

我们要尽可能的让互斥量上锁的粒度小,只用来确保必须的共享资源的线程安全。“粒度”通常用于描述锁定的范围大小,较小的粒度意味着锁定的范围更小,因此有更好的性能和更少的竞争。

2.3 try_lock

try_lock 是互斥量中的一种尝试上锁的方式。与常规的 lock 不同,try_lock 会尝试上锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回。

它的返回类型是 bool ,如果上锁成功就返回 true,失败就返回 false。

这种方法在多线程编程中很有用,特别是在需要保护临界区的同时,又不想线程因为等待锁而阻塞的情况下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::mutex mtx;

void thread_function(int id) {
// 尝试加锁
if (mtx.try_lock()) {
std::cout << "线程:" << id << " 获得锁" << std::endl;
// 临界区代码
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟临界区操作
mtx.unlock(); // 解锁
std::cout << "线程:" << id << " 释放锁" << std::endl;
} else {
std::cout << "线程:" << id << " 获取锁失败 处理步骤" << std::endl;
}
}

2.4 unique_lock

unique_lock 也是一种管理类模板(满足可移动构造可移动赋值但不满足可复制构造可复制赋值),灵活性比lock_guard高很多(允许手动加解锁、延迟锁定、有时限的锁定尝试、递归锁定、所有权转移和与条件变量一同使用),但是效率比较差,内存占用也比较多。

工作中一般使用lock_guard ,但希望更加自由时可以时候unique_lockunique_lock 的部分实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
template<typename _Mutex>
class unique_lock {
public:
typedef _Mutex mutex_type;

unique_lock() noexcept
: _M_device(0), _M_owns(false) {}

explicit unique_lock(mutex_type &__m)
: _M_device(std::__addressof(__m)), _M_owns(false) {
lock();
_M_owns = true;
}

unique_lock(mutex_type &__m, defer_lock_t) noexcept
: _M_device(std::__addressof(__m)), _M_owns(false) {}

unique_lock(mutex_type &__m, try_to_lock_t)
: _M_device(std::__addressof(__m)), _M_owns(_M_device->try_lock()) {}

unique_lock(mutex_type &__m, adopt_lock_t) noexcept
: _M_device(std::__addressof(__m)), _M_owns(true) {
// XXX calling thread owns mutex
}

unique_lock(const unique_lock&) = delete;
unique_lock& operator=(const unique_lock&) = delete;

template<typename _Clock, typename _Duration>
unique_lock(mutex_type &__m,
const chrono::time_point <_Clock, _Duration> &__atime)
: _M_device(std::__addressof(__m)),
_M_owns(_M_device->try_lock_until(__atime)) {}

template<typename _Rep, typename _Period>
unique_lock(mutex_type &__m,
const chrono::duration <_Rep, _Period> &__rtime)
: _M_device(std::__addressof(__m)),
_M_owns(_M_device->try_lock_for(__rtime)) {}

~unique_lock() {
if (_M_owns)
unlock();
}

void unlock() {
if (!_Pmtx || !_Owns) {
_Throw_system_error(errc::operation_not_permitted);
}

_Pmtx->unlock();
_Owns = false;
}
};

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::mutex mtx; // 创建一个互斥量
void print_message(int id) {
std::unique_lock<std::mutex> lock(mtx); // 锁定互斥量
std::cout << "Thread " << id << " is printing a message." << std::endl;
// 在此处可以进行线程安全的操作
} // 当 lock 超出作用域时,互斥量会自动解锁

int main() {
std::thread t1(print_message, 1);
std::thread t2(print_message, 2);

t1.join(); // 等待线程 t1 完成
t2.join(); // 等待线程 t2 完成

return 0;
}

unique_lock 也有RAII特性,当超出作用域自动调用析构函数时,会自动解锁;除此之外,我们也可以通过手动进行加解锁:

1
2
3
4
5
6
7
8
9
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 创建 unique_lock,但不立即锁定
// 执行一些不需要锁定的操作
std::cout << "Thread " << id << " is preparing to lock." << std::endl;
lock.lock(); // 手动加锁
std::cout << "Thread " << id << " has locked the mutex." << std::endl;
// 在此处进行线程安全的操作
std::cout << "Thread " << id << " is doing work inside the critical section." << std::endl;
lock.unlock(); // 手动解锁
std::cout << "Thread " << id << " has unlocked the mutex." << std::endl;

2.5 once_flage 和 call_once

once_flagecall_once也相当于一种锁,std::call_once 确保所给的函数在多线程环境中只会被调用一次(仅仅只会调用一次),即使多个线程同时调用 call_once,只有一个线程会实际执行该函数,其它线程会等待。

我们之前在网络编程中实现单例模板时便用到了这种方式:

网络编程(14)——基于单例模板实现的逻辑层 - 知乎

网络编程(19)——C++使用asio协程实现并发服务器 - 知乎

3. 保护共享数据

互斥量主要也就是为了保护共享数据,上一节的锁也已经为各位展示了一些。然而使用互斥量来保护共享数据也并不是在函数中加上一个 std::lock_guard 就万事大吉了。有的时候只需要一个指针或者引用,就能让这种保护形同虚设。比如对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性(抛出去之后函数已经解锁)。例如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空、判断操作内部加锁,但是因为判断结束后返回值就不在加锁了,就会存在线程安全问题。

3.1 问题一

示例1:

若我们将保护数据的指针或者引用传递到锁作用域之外,那么同样会让我们的保护形同虚设,造成封装泄漏问题,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Data{
int a{};
std::string b{};
public:
void do_something(){
// 修改数据成员等...
}
};

class Data_wrapper{
Data data;
std::mutex m;
public:
template<class Func>
void process_data(Func func){
std::lock_guard<std::mutex> lc{m};
func(data); // 受保护数据传递给函数
}
};
//-------------------------------------------------------------------------------------------//
Data* p = nullptr;

void malicious_function(Data& protected_data){
p = &protected_data; // 受保护的数据被传递到外部
}

Data_wrapper d;

void foo(){
d.process_data(malicious_function); // 传递了一个恶意的函数
p->do_something(); // 在无保护的情况下访问保护数据
}

Data_wrapperprocess_data 方法设计初衷是以线程安全的方式对 Data 进行操作。然而,process_data 允许将 Data 对象通过参数传递给任意函数,这导致潜在的封装泄漏。例如,在传递 malicious_function 后,受保护的 Data 对象可以被一个全局指针(p)引用,而不会受到原有互斥锁的保护。因为当 process_dataData 的引用传递给外部函数时,外部函数可能会保存这个引用,导致封装性和线程安全性失效。此时,即使 Data_wrapper 内部使用了 std::mutex 来保护 Data,外部代码依然可以在没有锁的情况下访问并修改 Data 对象,比如在没有被互斥量保护的情况下调用do_something()

我们传递的函数就不该是涉及外部副作用的,就应该是单纯的在受互斥量保护的情况下老老实实调用do_something()操作受保护的数据。

所以我们应该服从下列指引:不得向锁所在的作用域之外传递指针和引用,指向受保护的共享数据没无论是通过函数返回值将它们保存到对外可见 内存,还是将它们作为参数传递给使用者提供的函数。简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设

3.2 问题二

示例2:

比如定义如下栈,对于多线程访问时判断栈是否为空,若此后两个线程同时出栈,可能会造成崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template<typename T>
class threadsafe_stack1
{
private:
std::stack<T> data;
mutable std::mutex m; // const成员函数可以修改此成员变量
public:
threadsafe_stack1() {}
threadsafe_stack1(const threadsafe_stack1& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack1& operator=(const threadsafe_stack1&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
//问题代码
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}
// 危险
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
  • mutable std::mutex mmutable 关键字允许在 const 成员函数中修改该变量。默认情况下,const 成员函数不能修改任何成员变量,但如果某个成员变量被声明为 mutable,则可以在 const 成员函数中修改它。

3.2.1. 第一个问题

pop和empty的实现都有问题,举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);

std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});
std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});
t1.join();
t2.join();
}

首先,往我们定义的栈类中push一个元素,然后启动两个线程分别进行判空、出栈操作。

但问题就是这里,假设线程t1先启动,t1判断栈不为空,然后睡眠1s;t2在t1睡眠的时候也判断栈不为空,然后休息1s;当t1醒来之后将栈内唯一的元素pop出,成功执行;当t2醒来之后栈内已经没有元素,但因为t2之前判断栈不为空所以t2也会pop元素,此时就会报错,因为栈内已经没有元素可以pop了。流程如下:

这是接口本身的问题,它不仅仅出现在基于互斥的实现中,无锁实现也会发生条件竞争,那我们应该如何解决呢?

解决这个问题我们可以用抛出异常的方式,比如定义一个空栈的异常,以下这段代码体继承自 std::exception,并且重写了 what() 方法以返回异常的错误信息。

1
2
3
4
struct empty_stack : std::exception
{
const char* what() const noexcept;
};

然后实现pop函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 修改后
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}
// 修改前
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}

栈在pop的时候并不像之前直接将元素取出,而是先判断栈是否为空,如果栈为空但仍然调用了pop函数,那么就弹出empty_stack()异常。但如果这么做的话,就需要我们在外层调用pop的时候使用catch捕获异常,进行相应的处理。

3.2.2. 第二个问题

但是这个函数仍然有一些问题,比如当程序执行pop函数时,栈不为空,栈顶层数据成功被弹出,但假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够。 函数返回element时,就会就会存在数据做拷贝赋值时造成失败(内存不够用)。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。

如果你还不理解,那么可以看下面的解释。

考虑栈存储的元素类型为vector<int>,比如

1
threadsafe_stack1<vector<int>> safe_stack;

请注意,vector容器会动态调整大小,因此,当我们复制vector容器时,C++标准库需在程序的堆数据段上分配更多的内存,以便复制内容。假如系统负载过重或内存资源不足时,内存分配可能失败,导致vector容器的拷贝构造函数抛出 std::bad_lloc 异常。若vector容器的元素太大,就很可能发生这种情况。同时,我们又假设pop函数的定义是:返回栈顶元素的值,并从栈上将其移除。隐患由此而来:

只有在栈被改动之后(即元素成功从栈弹出),弹出的元素才返回给调用者,然后再向调用者复制数据的过程中,有可能因为数据太大(比如pop出一个元素数量巨大的vector容器,但系统内存资源恰好不足)导致异常抛出。但因为元素成功从栈弹出之后才会将弹出的值返回给调用者,栈中已经没有了这个元素,而复制却没有成功,那么这个数据就会丢失!!

比如执行上面我们修改后的pop函数,栈不空,且元素成功弹出(表示栈中不存在该元素),但是在return这个元素过程中,系统抛出了异常,导致数据没有成功返回,那么数据就丢失了。

那么这个问题如何解决呢?

我们只需将操作一分为二:先用top() 获取栈顶元素,随后再用pop() 将元素移出栈容器(也就是先进行拷贝,后pop,这样报错也就只能在pop前面复制的步骤发生),这样就保障了数据安全,即使我们无法安全地复制数据,数据仍然保留在栈中。

基于以上原理,我们重载两个版本的pop函数即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

在上面新实现的 threadsafe_stack 类中,我们重载了两个版本的pop函数:

第一个版本是带引用类型的参数(直接将元素传递给引用参数,因为引用参数已经在外界开辟好了空间,所以不会引发内存不足的问题);

第二个版本是将pop出的元素封装成智能指针类型(将元素top传入调用智能指针res的拷贝构造函数,即使因为内存过大报错,但元素仍在栈中),然后返回(return的时候也只是返回智能指针,智能指针占用的内存不大,我们在外层只需解引用return的智能指针,即可使用栈pop出的内容)。

这样在pop函数内部减少了数据的拷贝,防止内存溢出,其实这两种做法确实是相比之前直接pop固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop时如果队列为空则返回空指针,这样比抛出异常更友好一些。

1
2
3
4
5
6
7
8
9
10
td::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) return nullptr;
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

4. 死锁

4.1 死锁可能的两种情况

死锁其实就是不同线程在互斥上争抢锁:有两个线程,都需要同时锁住两个互斥,才可以进行某项操作,但它们分别都只锁住了一个互斥,都等着再给另一个互斥加锁。于时,双方毫无进展,因为它们都在等待对方解锁互斥。这种情形为死锁。如下图所示:

img

假如线程1和线程2并行循环运行,先当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::mutex  t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;
void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
//-------------------------------------------------------------------------
void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}

如果调用test_dead_lock()函数,那么程序运行之后在某一个时刻一定会导致死锁。

我们可以将加锁和解锁的功能封装为独立的函数,这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
std::cout << "lock1 end lock" << std::endl;
}
void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
std::cout << "lock2 end lock" << std::endl;
}
void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void safe_lock2() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}

防范死锁的建议通常是:始终按相同的顺序对两个互斥加锁。这也是层级加锁的实现原理。

但是有的时候即使固定锁顺序,依旧会产生问题。当有多个互斥量保护同一个类的对象时,对于相同类型的两个不同对象进行数据的交换操作,为了保证数据交换的正确性,就要避免其它线程修改,确保每个对象的互斥量都锁住自己要保护的区域。如果按照前面的的选择一个固定的顺序上锁解锁,则毫无意义,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct X{
X(const std::string& str) :object{ str } {}

friend void swap(X& lhs, X& rhs);
private:
std::string object;
std::mutex m;
};

void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock_guard<std::mutex> lock1{ lhs.m };
std::lock_guard<std::mutex> lock2{ rhs.m };
swap(lhs.object, rhs.object);
}

我们对同一个类的两个实例进行数据交换时,会导致它们陷入死锁:

1
2
3
X a{ "1" }, b{ "2" };
std::thread t{ [&] {swap(a, b); } }; // 1
std::thread t2{ [&] {swap(b, a); } }; // 2
  • 1 执行的时候,先上锁 a 的互斥量,再上锁 b 的互斥量。
  • 2 执行的时候,先上锁 b 的互斥量,再上锁 a 的互斥量。

完全可能线程 A 执行 1 的时候上锁了 a 的互斥量,线程 B 执行 2 上锁了 b 的互斥量。线程 A 往下执行需要上锁 b 的互斥量,线程 B 则要上锁 a 的互斥量执行完毕才能解锁,哪个都没办法往下执行,死锁。其实也就回到了最初的问题。

法1:

C++ 标准库有很多办法解决这个问题,可以使用std::lock ,它能一次性锁住多个互斥量,并且没有死锁风险。修改 swap 代码后如下:

1
2
3
4
5
6
7
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock(lhs.m, rhs.m); // 给两个互斥量上锁
std::lock_guard<std::mutex> lock1{ lhs.m,std::adopt_lock };
std::lock_guard<std::mutex> lock2{ rhs.m,std::adopt_lock };
swap(lhs.object, rhs.object);
}

因为前面已经使用了 std::lock 上锁,所以后的 std::lock_guard 构造都额外传递了一个 std::adopt_lock 参数,让其选择到不会上锁的构造函数。函数退出也能正常解锁。

std::locklhs.mrhs.m 上锁时若抛出异常,则在重抛前对任何已锁的对象调用 unlock() 解锁,也就是 std::lock 要么将互斥量都上锁,要么一个都不锁。如果 std::lock 给l hs.m 或 rhs.m 上锁时,这两个锁的任意一个被锁了, std::lock 就不可能不执行,*所以在执行std::lock之前,必须保证要处理的所有锁都处于unlock状态。

法2:

此外,C++17新增了RAII类模板std::scoped_lockstd::scoped_lock<>std::lock_guard<>完全等价 ,只不过前者是可变参数模板,接收各种互斥型别作为模板参数列表,还以多个互斥对象作为构造函数的参数列表,通常scoped_lock的效果比裸调用std::lock更好。

代码可以改写为:

1
2
3
4
5
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::scoped_lock guard{ lhs.m,rhs.m }; // ①
swap(lhs.object, rhs.object);
}

上例利用了C++17的新特性:类模板参数推导。①处的代码等价于

1
std::scoped_lock guard<std::mutex, std::mutex> guard(lhs.m,rhs.m);

如果我们需要同时获取多个锁,那么std::lock和std::scoped_lock 可以帮助我们防范死锁。但若代码分别获取各个锁,那么就需要程序员依靠经验将加锁和解锁的功能封装为独立的函数,这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况。

以下是一些常用的规则,用于约束程序员的行为,帮助写出无死锁的代码:

  • 避免嵌套锁
    线程获取一个锁时,就别再获取第二个锁。每个线程只持有一个锁,自然不会产生死锁。如果必须要获取多个锁,使用 std::lock
  • 避免在持有锁时调用外部代码
    这个建议是很简单的:因为代码是外部提供的,所以没办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。当写通用代码时(比如保护共享数据中的 Date 类)。这不是接口设计者可以处理的,只能寄希望于调用方传递的代码是能正常执行的。
  • 使用固定顺序获取锁
    如同第一个示例那样,固定的顺序上锁就不存在问题。
  • 层级加锁 按特定方式规定加锁次序,在运行期间据此查验枷锁操作是否遵守预设规则。我们详细学习一下这个方法。

4.2 层级加锁

现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁,所以可以自定义一个层级锁,保证实际项目中对多个互斥量加锁时是有序的。按照构思,我们应该把应用程序分层,并且明确每个互斥位于哪个层级。若某线程已对低层级互斥加锁,则不准它再对高层互斥加锁。

层级锁的设计思路是:每个互斥量(mutex)被赋予一个“层级值”(hierarchy value),并通过静态线程局部变量 thread_local 保存当前线程上一次持有的互斥量层级(上一个加锁的互斥量的层级值),以确保锁的获取顺序符合层级要求。

4.2.1 hierarchical_mutex

hierarchical_mutex 类的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class hierarchical_mutex {
private:
std::mutex _internal_mutex;
unsigned long const _hierarchy_value; // 当前层级值
unsigned long _previous_hierarchy_value; // 上一次层级值
// 当前线程的层次值,它被初始化为最大值,所以刚开始任意互斥元都可以被锁定
static thread_local unsigned long _this_thread_hierarchy_value; // 记录当前线程的层级值
// 层次检查,如果当前线程持有的层次及小于即将需要加锁互斥量的层次级,抛出异常
void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value)
throw std::logic_error("mutex hierarchy violated");
}
// 更新当前线程的层次值
void updata_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}

public:
explicit hierarchical_mutex(unsigned long value) :
_hierarchy_value(value), _previous_hierarchy_value(0){}

hierarchical_mutex(const hierarchical_mutex& ) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;


unsigned long const get_hierarchy_value() {
return _hierarchy_value;
}

void lock() {
// 检查当前线程的层级值是否大于要加锁的互斥量的层级值,只有大于才能加
check_for_hierarchy_violation();
_internal_mutex.lock(); // 加锁
updata_hierarchy_value();
}

void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value)
throw std::logic_error("mutex hierarcht violated");

_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}

bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}

updata_hierarchy_value();
return true;
}
};

thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);

该类定义了四个成员变量:

  • std::mutex _internal_mutex:实际的互斥锁。
  • unsigned long const _hierarchy_value:该互斥锁的层级值。
  • unsigned long _previous_hierarchy_value:上一个锁的层级值,用于在解锁后恢复。
  • static thread_local unsigned long _this_thread_hierarchy_value:当前线程的层级值记录,控制每个线程的锁获取顺序。

这里的关键是,使用线程专属的变量(_this_thread_hierarchy_value,以关键字thread_local 修饰)表示当前线程的层级编号,它初始化为ULONG_MAX(无符号long的最大值)。因此,最开始时,任意的_hierarchy_value互斥都可以被加锁。因为声明由thread_local 修饰,每个线程都具有自己的_this_thread_hierarchy_value副本,所以当该变量在某线程上的值与另一线程上的值完全无关,不同线程的该变量互不影响

1. 一个线程的所有hierarchical_mutex 类的实例共享一个静态变量_this_thread_hierarchy_value,其他实例对该静态变量的改变都会同时反应到每个实例的静态变量中。这样 hierarchical_mutex 类的所有实例在每个线程内有独立的 _this_thread_hierarchy_value,并且不受其他线程中该变量值的影响。
2. 线程本身不属于任何层级。_this_thread_hierarchy_value 的确意义是,当前线程最后一次加锁操作所牵涉的层级编号,它的值随不同线程而异,但在一个线程中,所有实例共享同一个_this_thread_hierarchy_value

层级锁能保证我们每个线程加锁时,一定是先加权重高的锁,后加权值低的锁,如果反过来就会抛出异常,并且释放时也保证了顺序。主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重是否大于锁的权重,如果满足条件则继续加锁。

4.2.2 示例⭐⭐⭐⭐⭐

通过一个银行转账系统的示例说明层级锁是如何使用的:

假设我们在银行系统中有两个账户对象。每个账户对象都有一个对应的 hierarchical_mutex,分别用于保护各自的账户资源。在这个场景中:

  • 最高层级值锁(层级值 10000)表示高优先级资源(例如银行系统的总体资源)。
  • 中等层级值锁(层级值 5000)表示中等优先级资源(例如账户信息)。
  • 最低层级值锁(层级值 1000)表示低优先级资源(例如账户余额)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 定义全局的高层级锁(模拟银行系统资源)
hierarchical_mutex high_level_mutex(10000); // 高层级

class BankAccount {
public:
explicit BankAccount(unsigned long hierarchy_value, double initial_balance)
: balance(initial_balance), account_mutex(hierarchy_value) {}

// 转账函数
void transfer(BankAccount& to, double amount) {
// 确保账户锁的顺序从高到低
BankAccount* higher = this;
BankAccount* lower = &to;

if (account_mutex.get_hierarchy_value() < to.account_mutex.get_hierarchy_value()) {
std::swap(higher, lower);
}

// 按顺序加锁
std::lock_guard<hierarchical_mutex> lock1(higher->account_mutex);
std::lock_guard<hierarchical_mutex> lock2(lower->account_mutex);

if (balance >= amount) {
balance -= amount;
to.balance += amount;
std::cout << "Transferred " << amount << " from account with hierarchy "
<< account_mutex.get_hierarchy_value() << " to account with hierarchy "
<< to.account_mutex.get_hierarchy_value() << "\n";
} else {
std::cout << "Insufficient funds\n";
}
}

double get_balance() const {
return balance;
}

private:
double balance;
hierarchical_mutex account_mutex; // 每个账户有一个层级锁
};

void bank_operations(BankAccount& account1, BankAccount& account2) {
std::lock_guard<hierarchical_mutex> high_lock(high_level_mutex); // 锁定高层级资源

account1.transfer(account2, 50); // 从 account1 转账到 account2
account2.transfer(account1, 30); // 从 account2 转账到 account1
}

int main() {
// 创建两个账户,分配不同的层级锁
BankAccount account1(5000, 200); // 中等层级
BankAccount account2(1000, 300); // 低层级
// 启动多个线程进行银行操作
std::thread t1(bank_operations, std::ref(account1), std::ref(account2));
std::thread t2(bank_operations, std::ref(account2), std::ref(account1));
t1.join();
t2.join();

std::cout << "Final balance of account1: " << account1.get_balance() << "\n";
std::cout << "Final balance of account2: " << account2.get_balance() << "\n";
return 0;
}
  • BankAccount 类:每个账户都有一个余额 (balance) 和一个 hierarchical_mutex 锁 (account_mutex)。

    • account_mutex 的层级值根据账户的重要性来分配,比如 account1 是中等层级,account2 是低层级。
    • transfer 函数中按顺序(从权值大的开始锁)锁定了两个账户的 account_mutex,防止在转账过程中出现数据竞争。
  • bank_operations 函数:银行操作函数首先锁定 high_level_mutex,表示高层级资源已被锁定,然后进行转账操作,保证在同一层级之上不会发生死锁。

  • **主函数 (main)**:启动两个线程执行转账操作,每个线程尝试在两个账户之间转账。由于使用了层级锁,转账操作不会因加锁顺序问题导致死锁。

简单解释一些代码运行过程:

首先,启动两个不同的线程运行bank_operations() 函数。

假设我们处于t1线程中,首先锁定高层资源,防止其他线程干扰。

  • 第一次转账:
    • 从账户1开始给账户2转账:higer是账户1互斥量的层次值,lower是账户2互斥量的层次值,并且此时higer > lower,不用转换加锁顺序;
    • 然后先对账户1进行加锁,然后对账户2进行加锁,并进行转账。
  • 第二次转账:
    • 从账户2开始给账户1转账:higer是账户2互斥量的层次值,lower是账户1互斥量的层次值,并且此时higer < lower,需要转换加锁顺序,因为账户2的层次值小于账户1的层次值,不能先对账户2进行加锁。
    • 然后先对账户1进行加锁,然后对账户2进行加锁,并进行转账。

t2线程的处理同样如此,通过上面的流程我们可以发现,无论是从账户1->账户2,还是从账户2->账户1,都是先对账户1进行加锁然后再对账户2进行加锁,因为账户1的权值大于账户2,所以不能先对账户2加锁,否则会抛出异常。通过层级加锁的方式,我们就保证了“始终按相同的顺序对两个互斥加锁”的防范意见(参考上面面对的第一种死锁问题做出的应对方式),也就是先对A加锁然后对B加锁,只要不同线程按照这个顺序进行加锁,一般就不会发生死锁现象(除非同时获取多个锁,但也可以通过std::lock()函数或者std::adopt_lock模板类解决)。

输出:

img