并发编程(4)——锁(下)
2025-04-30 12:24:32 # C++ # 并发编程

四、day4

上一节在学习锁的同时,简单了解了一些与互斥量配套使用的互斥包装器,比如lock_guard、try_lock、unique_lock等,但我们对unique_lock只是简单了解了一下,unique_lock有很多功能还待学习。本节中我们将详细学习:

1)unique_lock;⭐⭐⭐⭐⭐

2)C++14和17提供的共享锁;⭐⭐⭐

3)递归加锁。⭐

参考:

ModernCpp-ConcurrentProgramming-Tutorial/md/03共享数据.md at main · Mq-b/ModernCpp-ConcurrentProgramming-Tutorial

恋恋风辰官方博客

C++ 并发编程(4) unique_lock,共享锁以及递归锁_哔哩哔哩_bilibili


1. unique_lock

上一节中我们了解了unique_lock是一种管理类模板(满足可移动构造可移动赋值但不满足可复制构造可复制赋值),灵活性比lock_guard高很多(允许手动加解锁、延迟锁定、有时限的锁定尝试、递归锁定、所有权转移和与条件变量一同使用),但是效率比较差,内存占用也比较多。

1.1 unique_lock的构造函数

最快了解一个工具的方式就是解析它的源码,unique_lock 的源码如下( MSVC STL ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
_EXPORT_STD template <class _Mutex>
class unique_lock { // whizzy class with destructor that unlocks mutex
public:
using mutex_type = _Mutex;

unique_lock() noexcept = default;

_NODISCARD_CTOR_LOCK explicit unique_lock(_Mutex& _Mtx)
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // construct and lock
_Pmtx->lock();
_Owns = true;
}

_NODISCARD_CTOR_LOCK unique_lock(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
: _Pmtx(_STD addressof(_Mtx)), _Owns(true) {} // construct and assume already locked

unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock

_NODISCARD_CTOR_LOCK unique_lock(_Mutex& _Mtx, try_to_lock_t)
: _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock()) {} // construct and try to lock

template <class _Rep, class _Period>
_NODISCARD_CTOR_LOCK unique_lock(_Mutex& _Mtx, const chrono::duration<_Rep, _Period>& _Rel_time)
: _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock_for(_Rel_time)) {} // construct and lock with timeout

template <class _Clock, class _Duration>
_NODISCARD_CTOR_LOCK unique_lock(_Mutex& _Mtx, const chrono::time_point<_Clock, _Duration>& _Abs_time)
: _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock_until(_Abs_time)) {
// construct and lock with timeout
static_assert(chrono::_Is_clock_v<_Clock>, "Clock type required");
}

_NODISCARD_CTOR_LOCK unique_lock(unique_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) {
_Other._Pmtx = nullptr;
_Other._Owns = false;
}

unique_lock& operator=(unique_lock&& _Other) noexcept /* strengthened */ {
if (this != _STD addressof(_Other)) {
if (_Owns) {
_Pmtx->unlock();
}

_Pmtx = _Other._Pmtx;
_Owns = _Other._Owns;
_Other._Pmtx = nullptr;
_Other._Owns = false;
}
return *this;
}

~unique_lock() noexcept {
if (_Owns) {
_Pmtx->unlock();
}
}

unique_lock(const unique_lock&) = delete;
unique_lock& operator=(const unique_lock&) = delete;

void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}

_NODISCARD_TRY_CHANGE_STATE bool try_lock() {
_Validate();
_Owns = _Pmtx->try_lock();
return _Owns;
}

template <class _Rep, class _Period>
_NODISCARD_TRY_CHANGE_STATE bool try_lock_for(const chrono::duration<_Rep, _Period>& _Rel_time) {
_Validate();
_Owns = _Pmtx->try_lock_for(_Rel_time);
return _Owns;
}

template <class _Clock, class _Duration>
_NODISCARD_TRY_CHANGE_STATE bool try_lock_until(const chrono::time_point<_Clock, _Duration>& _Abs_time) {
static_assert(chrono::_Is_clock_v<_Clock>, "Clock type required");
_Validate();
_Owns = _Pmtx->try_lock_until(_Abs_time);
return _Owns;
}

void unlock() {
if (!_Pmtx || !_Owns) {
_Throw_system_error(errc::operation_not_permitted);
}

_Pmtx->unlock();
_Owns = false;
}

void swap(unique_lock& _Other) noexcept {
_STD swap(_Pmtx, _Other._Pmtx);
_STD swap(_Owns, _Other._Owns);
}

_Mutex* release() noexcept {
_Mutex* _Res = _Pmtx;
_Pmtx = nullptr;
_Owns = false;
return _Res;
}

_NODISCARD bool owns_lock() const noexcept {
return _Owns;
}

explicit operator bool() const noexcept {
return _Owns;
}

_NODISCARD _Mutex* mutex() const noexcept {
return _Pmtx;
}

private:
_Mutex* _Pmtx = nullptr;
bool _Owns = false;

void _Validate() const { // check if the mutex can be locked
if (!_Pmtx) {
_Throw_system_error(errc::operation_not_permitted);
}

if (_Owns) {
_Throw_system_error(errc::resource_deadlock_would_occur);
}
}
};

如你所见,std::unique_lock 的私有成员变量一共有两个,一个是互斥量的指针,一个是表示对象是否拥有互斥量所有权的 bool 类型的对象_Owns :

1
2
3
private:
_Mutex* _Pmtx = nullptr;
bool _Owns = false;

std::unique_lock 有多个构造函数,我们最常用的是只传给std::unique_lock 一个互斥量参数,和lock_guard的构造相似:

1
2
3
4
5
_NODISCARD_CTOR_LOCK explicit unique_lock(_Mutex& _Mtx)
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // construct and lock
_Pmtx->lock();
_Owns = true;
}

注意:构造函数初始化成员变量时,_Owns并不会一开始就置为true,表示当前对象拥有互斥量的所有权;而是在内部实现的时候,先对互斥量加锁,然后才将_Owns置为true,这是有一个先后逻辑的。

std::unique_lock的析构函数和lock_guard也有些许不同,前者需要经过一个所有权判断,只有当前对象用于互斥量的所有权时,才会解锁;而后者直接会对互斥量解锁。

1
2
3
4
5
6
7
8
9
10
11
// lock_guard
~lock_guard() noexcept {
_MyMutex.unlock();
}

// unique_lock
~unique_lock() noexcept {
if (_Owns) {
_Pmtx->unlock();
}
}

为什么需要增加一个 _Owns 表示所属权呢?我们使用 unique_lock 不就是想要对互斥量加锁,为什么还要表示它拥有互斥所有权呢,像lock_guard就不用这样做。

上面的代码还不够简单直接,我举个例子:

1
2
3
4
5
6
std::mutex m;

int main() {
std::unique_lock<std::mutex> lock{ m,std::adopt_lock };
lock.lock();
}

这段代码运行会抛出异常,原因很简单,因为 std::adopt_lock 表示不上锁,但是有所有权,即_Owns设置为true,当运行lock()成员函数的时候,调用了**_Validate()**进行检测,也就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}

void _Validate() const { // check if the mutex can be locked
if (!_Pmtx) {
_Throw_system_error(errc::operation_not_permitted);
}

if (_Owns) {
_Throw_system_error(errc::resource_deadlock_would_occur);
}
}

该函数用于检查互斥量(mutex)是否可以被锁定;如果_Pmtx为nullptr,抛出异常(未获得互斥量);如果未获得互斥量的所有权,同样抛出异常。在这里,我们获得了互斥量不抛出第一个异常,但是因为构造函数输入了第二个参数 std::adopt_lock,提前获得了所有权(所有权应该是在上完锁才会获得,但在这里提前获得),所有抛出第二个异常:

1
2
_NODISCARD_CTOR_LOCK unique_lock(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
: _Pmtx(_STD addressof(_Mtx)), _Owns(true) {} // construct and assume already locked

当我们输入互斥量和adopt_lock_t时,会调用unique_lock这个构造函数;与上面讲的最常用的构造函数不同,这个构造函数获得了互斥量但是却将所属权置为了true,并且不会自动给互斥量上锁。

所以我们需要主动的调用lock()成员函数进行加锁,然后因为所有权提前获得导致异常抛出:

1
2
3
4
5
void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}

除非我们写为,主动调用mutex的lock()函数,而不是调用unique_lock的lock()函数:

1
2
3
4
5
6
std::mutex m;

int main() {
std::unique_lock<std::mutex> lock{ m,std::adopt_lock };
lock.mutex()->lock(); // 调用mutex的lock()函数,而不是调用unique_lock的lock()函数
}

也就是说 std::unique_lock 要想调用 lock() 成员函数,必须是当前没有所有权。

所以正常的用法其实是,先对互斥量上锁,然后传递 std::adopt_lock 构造 std::unique_lock 对象表示拥有互斥量的所有权,即可在析构的时候正常解锁。如下:

1
2
3
4
5
6
std::mutex m;

int main() {
m.lock();
std::unique_lock<std::mutex> lock { m,std::adopt_lock };
}

除此之外,unique_lock还有其他构造函数可供其他参数输入,比如 std::defer_lock、std::adopt_lock_t、std::try_to_lock_t等、还可以输入时间参数以供有时限的锁定尝试。这里就不一一分析,当我们有需要的时候,可以选择性查阅源码,这并不难。

简而言之:

  • 使用 std::defer_lock 构造函数不上锁,要求构造之后上锁
  • 使用 std::adopt_lock 构造函数不上锁,要求在构造之前互斥量上锁
  • 默认构造会上锁,要求构造函数之前和构造函数之后都不能再次上锁
  • 使用 std::try_to_lock 构造函数会尝试锁定传入的互斥量。如果互斥量已被其他线程锁定,则构造函数会立即返回,而不会阻塞当前线程。

1.2 unique_lock 灵活在哪?

我们前面提到了 std::unique_lock 更加灵活,那么灵活在哪?很简单,它拥有 lock()unlock() 成员函数,所以我们能写出如下代码:

1
2
3
4
5
6
7
void f() {
//code..
std::unique_lock<std::mutex> lock{ m };
// 涉及共享资源的修改的代码...
lock.unlock(); // 解锁并释放所有权,析构函数不会再 unlock()
//code..
}

而不是像之前 std::lock_guard 一样使用 ‘**{}**’。

另外再聊一聊开销,其实倒也还好,unique_lock 多了一个 bool ,内存对齐,x64 环境也就是 16 字节。这都不是最重要的,主要是复杂性和需求,通常建议优先 std::lock_guard,当它无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock

1.3 其他常用函数

std::unique_lock 还有很多其他成员函数,我们这里简单介绍几个常用的函数。

1.3.1 lock和unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// lock
void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}
// unlock
void unlock() {
if (!_Pmtx || !_Owns) {
_Throw_system_error(errc::operation_not_permitted);
}
_Pmtx->unlock();
_Owns = false;
}

其实很简单,就是先判断是否满足要求(互斥量不是野指针、所有权未提前获得),然后加锁再获得所有权。解锁同理,先判断互斥量和所有权均存在,然后再解锁、释放所有权。

1.3.2 owns_lock

当我们不确定unique_lock是否持有互斥所有权,可以通过owns_lock函数查看,owns_lock函数会返回unique_lock的_Owns变量。

1
2
3
_NODISCARD bool owns_lock() const noexcept {
return _Owns;
}

简单举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//可判断是否占有锁
void owns_lock() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
lock.unlock();
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
}

输出:

1
2
owns lock
doesn't own lock

1.4 在不同作用域传递互斥量

互斥量满足**互斥体 (Mutex)的要求,不可复制不可移动(五三原理)。所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用。但是 unique_lock 支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移转移给其他的 std::unique_lock 对象。有些时候,这种转移(就是调用移动构造)是自动发生**的,比如当函数返回 std::unique_lock 对象。另一种情况就是得显式使用std::move

简而言之,具体怎么转移取决于移动数据的来源是左值还是右值。若是左值,则必须显式转移;若是右值,会自动调用移动构造函数。

1
2
3
4
_NODISCARD_CTOR_LOCK unique_lock(unique_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) {
_Other._Pmtx = nullptr;
_Other._Owns = false;
}

就是将数据成员赋给新对象,原来的置空,这就是所谓的所有权”转移,切勿被词语迷惑,也就是深拷贝。

std::unique_lock 是只能移动不可复制的类,它移动即标志其管理的互斥量的所有权转移了。

转移的主要用途是:*准许函数锁定互斥,然后把互斥的归属权转移给函数调用者,好让他在同一个锁的保护下执行其他操作。比如:

1
2
3
4
5
6
7
8
9
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk{ some_mutex };
return lk; // 调用移动构造
}
void process_data(){
std::unique_lock<std::mutex> lk{ get_lock() };
// 执行一些任务...
}

get_lock() 函数先锁定互斥,接着对数据做前期准备,再将归属权返回给调用者。return lk 这里会调用移动构造,将互斥量的所有权转移给调用方, process_data 函数结束的时候会解锁互斥量。

我相信你可能对 extern std::mutex some_mutex 有疑问,其实不用感到奇怪,这是一个互斥量的声明,可能别的翻译单元(或 dll 等)有它的定义,成功链接上。我们前面也说了:“所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用”,所以要特别注意互斥量的生存期。

extern 说明符只能搭配变量声明和函数声明(除了类成员或函数形参)。它指定外部链接,而且技术上不影响存储期,但它不能用来定义自动存储期的对象,故所有 extern 对象都具有静态或线程存储期。

如果只是简单写一个 std::mutex some_mutex 那么函数 process_data 中的 lk 会持有一个悬垂指针

1.5 锁粒度

锁的粒度表示加锁的精细程度(即一个锁所保护的数据量),一个锁的粒度要足够大(粒度越大表示保护的数据量越多),保证可以锁住要访问的共享数据。同时一个锁的粒度要足够小,保证非共享数据不被锁住影响性能。其实在网络编程中使用协程实现异步服务器的时候我们便用了该思想,锁只在保护队列数据的时候才会使用,只有不使用队列数据立马就会解锁,可以参考:

网络编程(19)——C++使用asio协程实现并发服务器 - 知乎

多线程也需要满足这个要求:假定多个线程正等待使用同一个资源,如果任何线程在必要范围以外持锁,就会增加等待所耗费的总时间。所以只要代码不再需要访问共享数据,那我们就调用unlock解锁;若以后需重新访问,则调用lock加锁,而unique_ptr则很好的支持手动加解锁。

1
2
3
4
5
6
7
8
9
void precision_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
lock.unlock();
//不设计共享数据的耗时操作不要放在锁内执行
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock();
shared_data++;
}

2. 共享锁

试想这样一个场景,对于一个DNS服务,我们可以根据域名查询服务对应的ip地址,它很久才更新一次,比如新增记录,删除记录或者更新记录等。平时大部分时间都是提供给外部查询,对于查询操作,即使多个线程并发查询不加锁也不会有问题,但是当有线程修改DNS服务的ip记录或者增减记录时,其他线程不能查询,需等待修改完再查询。或者等待查询完,线程才能修改。也就是说读操作并不是互斥的,同一时间可以有多个线程同时读,但是写和读是互斥的,写与写是互斥的。简而言之,写操作需要独占锁。而读操作需要共享锁。**

C++ 标准库自然为我们提供了其他两种互斥:***std::shared_timed_mutex(C++14)、std::shared_mutex***(C++17)。它们的区别简单来说,前者支持更多的操作方式,后者有更高的性能优势。C++11中无上述互斥,但可以通过boost库使用(boost库定义了该互斥)。

std::shared_mutex 同样支持std::lock_guard、std::unique_lock。和 std::mutex 做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用std::shared_lock<std::shared_mutex>获取访问权,多个线程可以一起读取。

  • std::shared_mutex
    • 提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数,这些函数都可以用于获取互斥锁
    • 提供了 try_lock_shared()lock_shared() 函数,这些函数可以用于获取共享锁。
    • std::shared_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。
  • std::shared_timed_mutex
    • std::shared_mutex 类似,也提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数用于获取互斥锁。
    • std::shared_mutex 不同的是,它还提供了 try_lock_shared()lock_shared() 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
    • std::shared_timed_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 std::shared_mutex 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,std::shared_timed_mutex 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。

std::shared_timed_mutex 具有 std::shared_mutex的所有功能,并且额外支持超时功能。所以以上代码可以随意更换这两个互斥量。

如果我们想构造共享锁,可以使用std::shared_lock,如果我们想构造独占锁, 可以使用std::lock_gurad

共享锁即读锁,对应std::shared_lock<std::shared_mutex>排他锁即写锁,对应std::lock_guard<std::shared_mutex>std::unique_lock<std::shared_mutex>.

我们用一个类DNService代表DNS服务,查询操作使用共享锁,而写操作使用独占锁,可以是如下方式的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class DNService {
private:
std::map<std::string, std::string> _dns_info;
mutable std::shared_mutex _shared_mtx;
public:
DNService() {}
//读操作采用共享锁
std::string QueryDNS(std::string dnsname) {
std::shared_lock<std::shared_mutex> shared_locks(_shared_mtx);
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
return iter->second;
}
return "";
}
//写操作采用独占锁
void AddDNSInfo(std::string dnsname, std::string dnsentry) {
std::lock_guard<std::shared_mutex> guard_locks(_shared_mtx);
_dns_info.insert(std::make_pair(dnsname, dnsentry));
}
};

3. 递归加锁

线程对已经上锁的 std::mutex 再次上锁是错误的,这是未定义行为。然而在某些情况下,一个线程会尝试在释放一个互斥量前多次获取,所以提供了std::recursive_mutex。比如,在实现接口的时候内部加锁,接口内部调用完结束自动解锁。会出现一个接口调用另一个接口的情况,如果用普通的std::mutex就会出现卡死,因为嵌套加锁导致卡死,但是我们可以使用递归锁std::recursive_mutex。

std::recursive_mutex 是 C++ 标准库提供的一种互斥量类型,它允许同一线程多次锁定同一个互斥量,而不会造成死锁。当同一线程多次对同一个 std::recursive_mutex 进行锁定时,只有在解锁与锁定次数相匹配时,互斥量才会真正释放。但它并不影响不同线程对同一个互斥量进行锁定的情况。不同线程对同一个互斥量进行锁定时,会按照互斥量的规则进行阻塞。

但在工作中并不推荐使用递归锁,我们可以从设计源头规避嵌套加锁的情况,将接口相同的功能抽象出来,统一加锁。

下面的设计演示了如何使用递归锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class RecursiveDemo {
public:
RecursiveDemo() {}
bool QueryStudent(std::string name) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
return false;
}
return true;
}
void AddScore(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
if (!QueryStudent(name)) {
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
}

//不推荐采用递归锁,使用递归锁说明设计思路并不理想,需优化设计
//推荐拆分逻辑,将共有逻辑拆分为统一接口
// 只需加一次锁
void AddScoreAtomic(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
return;
}
private:
std::map<std::string, int> _students_info;
std::recursive_mutex _recursive_mtx;
};

我们可以看到AddScore函数内部调用了QueryStudent, 对一个互斥量连续加了两次锁,所以采用了递归锁。

但是我们同样可以改变设计,将两者公有的部分抽离出来生成一个新的接口 AddScoreAtomicAddScoreAtomic可以不适用递归锁,照样能完成线程安全操作的目的。