并发编程(17)——基于同步方式的线程安全的链表
2025-09-16 13:08:41 # C++ # 并发编程

十七、day17

上一节实现的查找表中,桶类 bucket_type 使用了C++标准库提供的存储结构链表 list 作为数据存储容器,但通过C++标准库提供的链表 list 存储键值对虽然对并发读不影响(共享锁保证多个线程可以线程安全的读共享数据),但是不能并发写,在同一时间有且仅有一个线程可以修改共享数据(因为C++标准库提供的链表的增删改查是通过同一个互斥量实现的,锁粒度不够精细),我们需要像实现锁粒度足够小的线程安全的队列一样,自定义一个链表结构,通过多个锁来为增删查改进行保护,每个操作有自己独立的锁,实现写操作的并发。

参考:

恋恋风辰官方博客

Java-链表(单向链表、双向链表) - 南孚先生 - 博客园

【数据结构】链表(单链表实现+详解+原码)-CSDN博客

链表(单向链表的建立、删除、插入、打印) - Luv3 - 博客园

链表 - OI Wiki


1. 链表的概念

  • 链表是一种物理存储单元上非连续非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针连接次序实现的;
  • 每一个链表都包含多个节点,节点又包含两个部分,一个是数据域(储存节点含有的信息),一个是引用域(储存下一个节点或者上一个节点的地址);
  • 获取数据麻烦,需要遍历查找,存储元素数量无上限(只要内存空间足够);
  • 方便插入、删除。

单向链表示意图如下图所示:

1733993101975

其中,1 表示虚拟头节点,一般不存数据,用于存储下一个节点的地址;2、3、4是普通节点,用于存储数据和下一个节点的地址;5 是尾节点,只存储数据,指向 nullptr。

代码表示如下:

1
2
3
4
5
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};

双向链表中同样有数据域和指针域。不同之处在于,指针域有左右(或上一个、下一个)之分,用来连接上一个结点、当前结点、下一个结点。

img

双向链表示意图如下图所示:

image-20241212170227422

代码表示如下:

1
2
3
4
5
struct Node {
std::shared_ptr<T> data;
std::unique_ptr<node> left;
std::unique_ptr<node> right;
};

为了线程安全,一般将节点使用智能指针通过模板的方式封装,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename T>
struct node {
std::shared_ptr<T> data;
std::unique_ptr<node> next;
// 构造函数:初始化数据和指针
node(const T& value) : data(std::make_shared<T>(value)), next(nullptr) {}
// 构造函数:允许外部传入shared_ptr作为数据
node(std::shared_ptr<T> value_ptr) : data(std::move(value_ptr)), next(nullptr) {}
// 禁用拷贝构造函数和赋值运算符
node(const node&) = delete;
node& operator=(const node&) = delete;
// 默认移动构造函数、赋值运算符和析构函数
node(node&&) = default;
node& operator=(node&&) = default;
~node() = default;
};

上面的两种链表显而易见可以通过一个互斥量控制整个链表的增删查改,进而实现线程安全的访问、修改。但是,因为使用一个互斥量保护整个链表的锁粒度太大,如果我们想要实现支持多线程高并发访问、修改链表,那么性能会很有限(没办法同时使用多个线程修改链表,同一时间有且仅有一个线程可以修改,其他线程需等待,虽然我们通过条件变量挂起了这些线程,避免了算力资源的浪费,但是我们没有办法实现修改函数同时调用),我们可以对每个节点都使用一个互斥量,这样能保证多个线程操作不同节点时加不同的锁,减少耦合性。

此外,我们尽量使用智能指针存储数据,使用智能指针存储数据不仅能防止内存不够抛出异常,而且能够有效节省资源,智能指针间的赋值不涉及内存的分配(但是创建一个指针会占8字节)

上面的做法在并发编程(15)——基于同步方式的线程安全的栈和队列 | 爱吃土豆的个人博客中做了详细解释,可参考。

2. 支持多线程、高并发的单向链表

2.1 支持前插的单向链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once
#include <mutex>
#include <memory>

template<typename T>
class threadsafe_list // 单向链表
{
struct node
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node() :next() {}
node(const T& value) :
data(std::make_shared<T>(value)){}
};

node head;
public:
threadsafe_list()
{}

~threadsafe_list()
{
remove_if([](const node&) {return true; });
}

threadsafe_list(threadsafe_list const& other) = delete;
threadsafe_list& operator=(threadsafe_list const& other) = delete;

void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}

template<typename Function>
void for_each(Function f)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}

template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}

template<typename Predicate>
void remove_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
};
  1. 单向链表模板类的私有成员:

    链表基本的节点构造如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    struct node
    {
    std::mutex m;
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
    node() :next() {}
    node(T const& value) :
    data(std::make_shared<T>(value)){}
    };

    node head;

    每个节点都维护一个互斥量,所以结构体 node 需定义一个互斥量成员变量,用于维护当前节点的增删查改操作。这里只定义了两个默认构造函数,所以拷贝构造函数、赋值运算符、析构函数、移动构造、移动运算符均使用系统提供的默认函数(3/5/0法则)。

    此外,还有一个虚拟头节点 head(一般使用 dummyHead 作为变量名)。

  2. 构造函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    threadsafe_list(){}

    ~threadsafe_list()
    {
    remove_if([](node const&) {return true; });
    }

    threadsafe_list(threadsafe_list const& other) = delete;
    threadsafe_list& operator=(threadsafe_list const& other) = delete;

    单向链表模板类中定义了析构函数并且禁止了拷贝构造和赋值运算符,并且根据3/5/0法则,编译器将阻止隐式生成移动构造函数和移动赋值运算,所以改模板类仅有定义的构造函数可使用,析构函数使用自定义析构。

  3. 析构函数:

    析构函数调用了模板函数 remove_if,下面是它的详解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template<typename Predicate>
void remove_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (const node* next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
  • 该函数通过模板来接受一个谓词函数(Predicate p),然后遍历链表并根据谓词判断删除满足条件的节点:

    • 这里析构函数将 lambda 函数 [](const node&) {return true; } 作为谓词函数传入,该函数总会返回true,所以链表的所有元素均满足谓词函数的条件,删除所有元素
  • current 用于指向链表中的每个元素,从头节点 head 开始备遍历;

  • 在 while 循环判断条件中,调用智能指针的 get 函数获取当前节点的下一个节点包装的原始指针,如果原始指针不为空,则进入循环:

    • 使用 std::unique_ptr 锁住当前节点的下一个节点中的互斥量,因为是从虚拟头节点的下一个节点是真正头节点,所以相当于从头遍历链表
    • 因为析构函数传入的谓词函数总会返回 true,所以if条件总满足(除了最后一个节点),如果当前链表是 current->Node1->Node2->Node3->nullptr
      • 将 Node1 转移到 old_next
      • 将 current->next 指向 Node2,第一次循环中,current->next 最先指向Node1
      • 解锁 Node1
      • 作用域结束后,智能指针释放持有资源,调用 node1 的析构函数,Node1 被释放
      • 当前链表变为: current->Node2->Node3->nullptr
    • 循环持续到最后一个节点时,因为最后一个节点持有数据,但是next指向nullptr,所以if-else中走的是else分支,这里会将虚拟头节点解锁,并且将最后一个节点赋值给current,当整个函数作用域结束后,current调用析构函数释放最后一个节点。而虚拟头节点始终存在。
    • 当通过get() 获取到最后一个节点的next指针时,因为指向nullptr,循环条件不满足,退出循环

我们传给模板函数 remove_if 的谓词函数是一个lmbda函数,它的参数类型是 const node&,但我们调用谓词函数p(*next->data)时,其中 *next->data 的类型是 T&,而 T& 和 const node& 类型明显不匹配(尽管const node&既可以接受node又可以接受const node),那么为什么能编译通过?

其实这里编译器隐式调用了 node 的构造函数,将 T& 类型的对象(*next->data,data中的数据是T类型但是解引用后成了T&)隐式转换为了 node 类型。因为node的构造函数只接受一个参数,形参类型是const T&,既能接受const T&也能接受T&,所以这里T&通过隐式调用构造函数被转换为了node。而 const node&可以接受node类型的参数。

  1. push_front
1
2
3
4
5
6
7
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}
  • 该函数用于将一个新元素插入至链表头部
    • 首先创建一个新节点,并将虚拟头节点的互斥量给锁住
    • 将新节点的 next 指向链表的第一个真实节点(虚拟头节点的 next 指向的元素)
    • 然后将虚拟头节点的 next 指向新节点,即可实现新元素的插入

请注意,插入和删除的加锁顺序需要保持一致,都需要从虚拟头节点开始逐个往后加,这样可以避免死锁

  1. for_each
1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename Function>
void for_each(Function f)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}
  • 该函数的作用和 STL 中的 for_each 算法作用相同,用于遍历容器中的所有元素,并对每个元素执行指定的函数
  • 创建局部指针变量 current 存储虚拟头节点的地址,并对虚拟头节点的互斥加锁,包装访问 虚拟头节点和其 next 指针是安全的
  • 获取 currentnext 指针存储的节点原始地址,如果不为空,则进入循环
    • 将获取的当前节点的下一个节点的互斥加锁,确保访问下一个节点以及下一个节点的next指针是安全的
    • 将虚拟头节点解锁,修改虚拟头节点的数据内容
    • 将下一个节点的数据解引用并传递给函数 f,用于对元素修改
    • 最后将 current 更新为下一个节点,并将下一个节点的互斥转移到 lk 中,准备进入下一个循环

如果进入循环前链表的结构为: current(DummyH)->Node1->Node2->Node3->nullptr,current 和 DummyH 所处地址相同

第一次循环后,DummyH->current(Node1)->Node2->Node3->nullptr,current 和 Node1所处地址相同

第二次循环后,DummyH->Node1->current(Node2)->Node3->nullptr,current 和 Node2所处地址相同

按这样的顺序直至最后一个空指针,然后退出循环

  1. find_first_if
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}
  • 该函数和 STL 容器中的 std::find_if 算法作用相同,用于在容器中查找第一个满足指定条件的元素,并返回其数据;如果没有找到,则返回空的智能指针
  • 首先接受一个谓词函数,用于判断元素是否满足给定条件
  • 起始流程相同,获取虚拟头节点的地址保存到局部指针变量 current 中,并将虚拟头节点的互斥锁住
  • 然后获取当前节点的下一个节点的原始地址,如果不为空指针则进入循环:
    • 首先将下一个节点的互斥锁住,并将虚拟头节点的互斥解锁
    • 将下一个节点的数据解引用传递给谓词函数 p,判断是否满足条件,如果满足条件:
      • 直接将下一个节点的数据返回
    • 否则,将 current 的指向更换为下一个节点,并将下一个节点的互斥转移给 lk,继续遍历容器
    • 如果最后一个元素仍不满足谓词函数,则返回一个空智能指针

我们实现该链表的目的是为了采用精细锁度的锁操作,摆脱单一的全局互斥,从而增加并发操作的机会。那么上面这个仅支持前插的单向链表实现这一点了嘛?

我们实现了,不同的线程可以同时在不同的节点上工作(每个节点都有自己独立的互斥进行线程安全的维护),无论具体的操作是利用 for_each() 处理数据,还是利用 find_first_if() 进行查找,还是通过 remove_if() 删除节点。

此外,因为这些函数内部互斥的加锁顺序是按照链表中的节点前后顺序进行的,故多线程不可能出现 “超越他人的处理节点”,若一个线程需要在某个节点上耗费特别长的时间,那么其他线程在处理完该节点之前的数据抵达该节点时,必须等待处理线程结束释放锁才能继续往后走。

2.2 支持前插and后插的单向链表

如果我们想要从链表尾部开始插入一个新节点呢,我们应该如何操作?

  • 我们需要维护一个尾部指针,这个尾部指针初始开始指向虚拟头节点,当插入元素后,尾部指针指向最后一个节点的地址。

那么现在有一个问题,我们需要专门给尾部指针增加一个锁吗

实际上,给尾部指针单独增加一个锁并没有必要,尾部指针本身指向尾部节点,因此可以完全依赖尾部节点内部的锁来保证线程安全;同样,头节点的操作也可以依赖节点内部的锁来实现保护。

至于尾部和头部插入竞争的情形,其实根本不会发生,在分析头尾插入操作之间可能的竞争条件时,可以重点考虑链表为空的情况。此时,头节点和尾节点是同一个节点(即虚拟头节点),由于头尾节点共用一个内部的 mutex,因此无论是执行 push_front() 还是 push_back(),实际上都在竞争同一个锁。这个锁能够自然地保证这两种操作串行化,从而避免竞争条件。同理,在同时考虑头部增删和尾部增删的竞争条件时,都可以考虑头尾节点合二为一的情形,此时头尾节点内部是同一个锁,自动保证了线程安全,避免了竞争条件

以上条件均是在满足加锁顺序是从头到尾依次加时才可行的,如果我们是双向链表插入时(这时锁即可能从后向前加也可能从前向后加),这时候就必须使用一个额外的锁保护尾指针,否则会造成死锁。

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#pragma once
#include <mutex>
#include <memory>

template<typename T>
class double_push_list
{
struct node_d
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node_d> next;
node_d() :
next()
{}
node_d(T const& value) :
data(std::make_shared<T>(value))
{}
};

node_d head;
node_d * last_node_ptr;
public:
double_push_list()
{
last_node_ptr = &head;
}

~double_push_list()
{
remove_if([](node_d const&) {return true; });
}

double_push_list(double_push_list const& other) = delete;
double_push_list& operator=(double_push_list const& other) = delete;

void push_front(T const& value)
{
std::unique_ptr<node_d> new_node(new node_d(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
// 更新最后一个节点
if (head.next->next == nullptr) {
last_node_ptr = head.next.get();
}
}

void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m);
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}

template<typename Function>
void for_each(Function f)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}

template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data))
{
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}

template<typename Predicate>
void remove_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current; // 将尾指针 `last_node_ptr` 指向虚拟头节点
}
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}

template<typename Predicate>
bool remove_first(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current;
}
next_lk.unlock();
return true;
}

lk.unlock();
current = next;
lk = std::move(next_lk);
}
return false;
}

template<typename Predicate>
void insert_if(Predicate p, T const & value)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while(node_d * const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*(next->data))) // 如果找到满足条件的节点
{
std::unique_ptr<node_d> new_node(new node_d(value));
// 将新节点的 next 指向当前节点的下一个节点
auto old_next = std::move(current->next);

// 将当前节点的 next 更新为新节点
new_node->next = std::move(old_next);
//当前节点的下一个节点更新为新节点
current->next = std::move(new_node);
return;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}

void remove_last() {
if (last_node_ptr == &head) {
std::cerr << "Error: Attempt to remove from an empty list!" << std::endl;
return;
}

node_d* current = &head;
std::unique_lock<std::mutex> current_lock(current->m); // 锁住虚拟头节点
// 遍历到最后一个节点的前一个节点
while (current->next.get() != last_node_ptr) {
current_lock.unlock();
current = current->next.get();
current_lock = std::unique_lock<std::mutex>(current->m);
}
// 删除最后一个节点
std::unique_ptr<node_d> old_next = std::move(current->next);
last_node_ptr = current; // 更新尾指针
}
};
  1. 前插 push_front
1
2
3
4
5
6
7
8
9
10
11
void push_front(T const& value)
{
std::unique_ptr<node_d> new_node(new node_d(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
// 更新最后一个节点
if (head.next->next == nullptr) {
last_node_ptr = head.next.get();
}
}
  • 和 2.1 版本的 push_front 相比,该版本增加了一个”更新最后一个节点“的步骤
  • 只有当第一次增加新节点时,我们才需要更新尾指针,因为尾指针被更新后就一直指向最后一个节点,而前插元素只会从链表头开始插入,并不会修改最后一个节点。所以我们只需要判断当前插入是否为第一次插入即可,如果是,更新尾指针至新节点。
  1. 后插push_back
1
2
3
4
5
6
void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m);
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}
  • 该函数用于在链表尾部插入新节点
  • 创建一个新节点,并将尾指针指向的节点内部互斥给锁住(因为要修改last_node_ptr)
  • 更新尾指针

注意,因为在单向链表中我们是按从头到尾的顺序依次加锁的,所以我们这里先通过尾指针直接调用其指向节点内部的锁,也可以保护尾指针的修改,因为抵达最后一个节点(未更新前的最后一个节点)的其他线程均会被阻挡在外,直至最后一个节点(未更新前的最后一个节点)修改完毕。

  1. 析构函数

析构函数调用的 remove_if 函数也有一些改变,需要判断删除的节点是否为最后一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename Predicate>
void remove_if(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current; // 将尾指针 `last_node_ptr` 指向虚拟头节点
}
next_lk.unlock();
}
else
{
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
  • 执行逻辑和 2.1 版本的一样,只不过在删除最后一个节点时(如果满足谓词函数的条件),需要将尾指针 last_node_ptr 指向虚拟头节点
  1. remove_first
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename Predicate>
bool remove_first(Predicate p)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node_d* const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if (p(*next->data))
{
std::unique_ptr<node_d> old_next = std::move(current->next);
current->next = std::move(next->next);
//判断删除的是否为最后一个节点
if (current->next == nullptr) {
last_node_ptr = current;
}
next_lk.unlock();
return true;
}

lk.unlock();
current = next;
lk = std::move(next_lk);
}
return false;
}
  • 该函数用于删除链表中第一个满足谓词函数的节点
  • 执行流程和 remove_if 相同,只不过如何找到满足谓词函数的节点,该函数会返回 true,否则返回 false
  1. insert_if
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename Predicate>
void insert_if(Predicate p, T const & value)
{
node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while(node_d * const next = current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*(next->data))) // 如果找到满足条件的节点
{
std::unique_ptr<node_d> new_node(new node_d(value));
// 将新节点的 next 指向当前节点的下一个节点
auto old_next = std::move(current->next);

// 将当前节点的 next 更新为新节点
new_node->next = std::move(old_next);
//当前节点的下一个节点更新为新节点
current->next = std::move(new_node);
return;
}
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
  • 该函数用于在链表中找到满足条件的第一个节点之前插入一个新节点
  • 首先是惯例,创建一个局部指针变量 current,设置current为虚拟头节点并加锁
  • 然后进入循环遍历链表,获取current的下一个节点next,并对其内部互斥加锁:
    • 检查 p(*(next->data)) 是否满足条件,如果满足条件:
      • 创建一个新节点,赋值为 value
      • 新节点的 next 指针指向当前节点的下一个节点(旧的 next)。
      • 当前节点的 next 更新为新节点。
    • 如果当前节点未满足条件,则解锁当前节点的锁并移动到下一个节点继续遍历。

其实这里还有一个小问题需要我们思考一下:

虽然我们在 2.2 的开头说了尾部和头部不可能竞争的情形(只适用于单向链表且加锁从头到尾依次加的情况),但我们仍需要举一个例子说明一下。

假如我们现在有两个线程 t1 和 t2,线程 t1 用于执行 push_back 函数在链表的尾部新加一个节点,而线程 t2 想要删除最后一个节点。那么,有没有可能发生以下情况:

假设线程2先执行删除操作,节点更新并且更新last_node_ptr的值,而此时线程1因为之前无法抢占最后一个节点(last_node_ptr)自带的互斥量所以挂起,当线程2执行完后,线程1才开始继续执行,但是此时last_node_ptr已经变化了,而线程1可能还用的是旧的last_node_ptr的值,导致插入数据失败(很可能崩溃或者插入到一个分叉的链表)。

其实如果加锁顺序是一致的,这种情况不可能发生,因为尾部指针指向的节点内部锁已经足够使用,不可能发生线程t1修改后,线程t2还持有更新前值的情况,比如下述代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void push_back(T const& value) {
std::unique_ptr<node_d> new_node(new node_d(value));
std::unique_lock<std::mutex> lk(last_node_ptr->m); // 锁住当前尾节点
last_node_ptr->next = std::move(new_node);
last_node_ptr = last_node_ptr->next.get(); // 更新最后节点
}
// 错误版本
void remove_last() {
std::unique_lock<std::mutex> tail_lock(last_node_ptr->m); // 锁住当前尾节点

node_d* current = &head;
std::unique_lock<std::mutex> lk(head.m); // 错误的,不能已经对尾节点加锁的情况下从后往前加锁
while (current->next.get() != last_node_ptr) {
std::unique_lock<std::mutex> current_lock(current->m); // 锁住当前节点
current = current->next.get();
}
current->next.reset();
last_node_ptr = current;
}
// 正确版本
void remove_last() {
if (last_node_ptr == &head) {
std::cerr << "Error: Attempt to remove from an empty list!" << std::endl;
return;
}

node_d* current = &head;
std::unique_lock<std::mutex> current_lock(current->m); // 锁住虚拟头节点
// 遍历到最后一个节点的前一个节点
while (current->next.get() != last_node_ptr) {
current_lock.unlock();
current = current->next.get();
current_lock = std::unique_lock<std::mutex>(current->m);
}
// 删除最后一个节点
std::unique_ptr<node_d> old_next = std::move(current->next);
last_node_ptr = current; // 更新尾指针
}

在push_back函数中,直接将尾指针指向节点的互斥锁住;在remove_last函数中,先锁住虚拟头节点的互斥,然后依次往后逐个加解锁,直至找到最后一个节点的上一个节点。这两个函数的加锁流程其实都遵循了从前往后依次加锁,确保尾部状态的一致性。

请注意,在 push_back 函数中,我们如果直接将尾指针指向节点的互斥锁住,那么后续就不能对其他节点的互斥进行加解锁,否则就会出错;我们要么从头到尾依次加解锁,要么直接从中间某一个部分加锁,但是不能对之前的部分进行加锁,然后继续往后加。总之,对于单向链表的加锁,要保证从前往后的顺序。

last_node_ptr 的更新也在锁的保护范围内,不存在线程1使用“旧值”的情况。无论是 push_back 还是删除尾部节点,last_node_ptr 的修改在锁保护下完成,保证它指向的是当前尾部状态。

简单对该例子做个测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TestTailPushAndDelete() {
double_push_list<int> list;
// 创建线程执行 push_back 和 remove_last
std::thread t1([&list]() {
for (int i = 1; i <= 10; ++i) {
list.push_back(i);
std::cout << "push back " << i << " success" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});

std::thread t2([&list]() {
for (int i = 0; i < 10; ++i) {
list.remove_last();
std::cout << "remove last success" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(15));
}
});

t1.join();
t2.join();

// 打印最终链表状态
std::cout << "Final list: ";
list.print_list();
}

void print_list() {
node_d* current = head.next.get();
while (current) {
std::cout << current->data << " -> ";
current = current->next.get();
}
std::cout << "NULL\n";
}

函数 print_list 是链表类的成员函数,方便观察数据变化,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
push back 1 success
remove last success
Error: Attempt to remove from an empty list!push back 2 success

remove last success
push back 3 success
remove last successpush back 4 success

push back 5 success
push back 6 success
remove last success
push back 7 success
push back remove last success
8 success
push back 9 success
push back 10 success
remove last success
remove last success
remove last success
remove last success
remove last success
Final list: 0000021DAC5F26C0 -> NULL

因为我们没有加内存序保证顺序执行,所以输出是乱序的,但是可以看出来,只要按顺序加锁,就不会发生数据竞争问题。

2.3 测试

创建三个线程进行测试

  • 线程1执行push_front将0到20000放入链表
  • 线程2执行push_back将20000到40000的数据放入链表
  • 线程3执行删除操作,将数据从0到40000删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void MultiThreadPush()
{
double_push_list<MyClass> thread_safe_list;
std::thread t1([&]()
{
for (int i = 0; i < 20000; i++)
{
MyClass mc(i);
thread_safe_list.push_front(mc);
std::cout << "push front " << i << " success" << std::endl;
}
});
std::thread t2([&]()
{
for (int i = 20000; i < 40000; i++)
{
MyClass mc(i);
thread_safe_list.push_back(mc);
std::cout << "push back " << i << " success" << std::endl;
}
});
std::thread t3([&]()
{
for (int i = 0; i < 40000; )
{
bool rmv_res = thread_safe_list.remove_first([&](const MyClass& mc)
{
return mc.GetData() == i;
});
if (!rmv_res)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
i++;
}
});
t1.join();
t2.join();
t3.join();
std::cout << "begin for each print...." << std::endl;
thread_safe_list.for_each([](const MyClass& mc)
{
std::cout << "for each print " << mc << std::endl;
});
std::cout << "end for each print...." << std::endl;
}

输出结果为:

e70908420fc71c6e92485d21f90955c

0369fe3f41f15c5068cb94a0cf79713

因为没有使用内存序保证顺序执行,所以输出是乱序的,而且因为不同线程会竞争资源,t2和t3线程明显提前结束,而t1线程最后结束。但足以保证该模板类的有效性。