二、day2 今天学习如何管理线程,包括:
1)线程的归属权如何进行转移
2)joining_thread
3)jthread
4)如何使用容器对线程进行管理,并简单举了一个关于多线程并发的例子
5)如何辨识线程(通过线程id)
参考:
恋恋风辰官方博客
C++ 并发编程(2) 线程所属权管理_哔哩哔哩_bilibili
ModernCpp-ConcurrentProgramming-Tutorial/md/02使用线程.md at main · Mq-b/ModernCpp-ConcurrentProgramming-Tutorial
1. 线程管控 在上一节中我们详细解析了 thread 的源码,知道了 thread 只有一个私有数据成员_Thr
,类型是一个结构体,包含两个数据成员:_Hnd
和 _Id
,前者指向线程的句柄,后者保有线程的 ID。并且,thread禁用了拷贝构造函数和赋值运算符,所以只能通过移动赋值运算符、移动构造函数和局部变量返回 的方式将线程变量管理的线程所有权转移给其他变量管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 thread (thread&& _Other) noexcept : _Thr(_STD exchange (_Other._Thr, {})) {}thread& operator =(thread&& _Other) noexcept { if (joinable ()) { _STD terminate (); } _Thr = _STD exchange (_Other._Thr, {}); return *this ; } thread (const thread&) = delete ;thread& operator =(const thread&) = delete ;
可以参考上一节的文章:
并发编程(1)——线程、thread源码解析 - 知乎
1.1 归属权转移 线程可以通过detach在后台运行或者让开辟这个线程的父线程等待该线程完成。但每个线程都应该有其归属权,也就是归属给某个变量管理:
1 2 3 void some_function () {} std::thread t1 (some_function) ;
在该段代码中,t1管理一个运行 some_function 函数的线程。
我们可以通过下面几种方式将线程归属权进行转移:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void some_function () { while (true ) { std::this_thread::sleep_for (std::chrono::seconds (1 )); } } void some_other_function () { while (true ) { std::this_thread::sleep_for (std::chrono::seconds (1 )); } } std::thread t1 (some_function) ; std::thread t2 = std::move (t1); t1 = std::thread (some_other_function); std::thread t3; t3 = std::move (t2); std::thread t4 (std::move(t3)) ;t1 = std::move (t4); std::this_thread::sleep_for (std::chrono::seconds (2000 ));
首先,创建一个线程运行 some_function函数并交给线程变量 t1 进行管理,我们可以通过移动构造函数或者移动赋值运算符进行归属权转移。我们这里使用移动赋值运算符将t1的线程归属权转移给t2,此后,t2正式管理t1之前管理的线程,而t1无效;
1 2 3 std::thread t1 (some_function) ; std::thread t2 = std::move (t1); t1 = std::thread (some_other_function);
t1 无效后可继续绑定其他线程,注意std::thread(some_other_function)
返回的是一个thread类型的右值引用 ,然后通过移动赋值运算符将线程管理权转移给t1(赋值运算符被delete,所以只能右值赋值);
1 2 std::thread t4 (std::move(t3)) ;
我们也可以使用移动构造函数将t3的所有权转移给t4.
但是我们不能将线程管理权转移给一个已经有另一个线程管理权的线程变量,比如
该段代码会造成程序崩溃。所以,不能将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。
此外,我们也可以通过返回局部变量的方式转移所属权:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 std::thread f () { return std::thread (some_function); } void param_function (int a) { while (true ) { std::this_thread::sleep_for (std::chrono::seconds (1 )); } } std::thread g () { std::thread t (param_function, 43 ) ; return t; } int main () { std::thread t1 = f (); if (t1. joinable ()) { t1. join (); } std::thread t2 = g (); if (t2. joinable ()) { t2. join (); } return 0 ; }
我们可以在函数内返回创建好的thread对象(右值,因为是临时对象),C++ 在返回局部变量时,会优先寻找这个类的拷贝构造函数,如果没有就会使用这个类的移动构造函数,
返回的类型不能是引用,临时对象会在函数结束时被销毁,而返回的引用会指向一个已经被销毁的对象。
1 2 3 std::thread& f () { return std::thread (some_function); }
上段代码是错误的,返回局部变量时不能返回引用。
1.2 joining_thread 在上节中我们学习了线程守卫(RAII技术),即主线程出现异常时,希望子线程能够运行结束后才退出主程序。其实 joining_thread 就是一个自带RAII技术的 thread类,它和 std::thread 的区别就是析构函数会自动 join 。
joining_thread 是 C++17标准的备选提案,但是并没有被引进,直至它改名为 std::jthread 后,进入了C++20标准的议程(现已被正式纳入C++20标准)。
boost库中有 joining_thread 类的实现。
我们存储一个 std::thread
作为底层数据成员,稍微注意一下构造函数、赋值运算符和析构函数的实现即可,其他实现和thread
的相似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 class joining_thread { std::thread t; public : joining_thread ()noexcept = default ; template <typename Callable, typename ... Args> explicit joining_thread (Callable&& func, Args&&...args) : t{ std::forward<Callable>(func), std::forward<Args>(args)... } {} explicit joining_thread (std::thread t_) noexcept : t{ std::move (t_) } {} joining_thread (joining_thread&& other)noexcept : t{ std::move (other.t) } {} joining_thread& operator =(std::thread&& other)noexcept { if (joinable ()) { join (); } t = std::move (other); return *this ; } joining_thread& operator =(joining_thread other) noexcept { if (joinable ()) { join (); } _t = std::move (other._t ); return *this ; } ~joining_thread () { if (joinable ()) { join (); } } void swap (joining_thread& other) noexcept { t.swap (other.t); } std::thread::id get_id () const noexcept { return t.get_id (); } bool joinable () const noexcept { return t.joinable (); } void join () { t.join (); } void detach () { t.detach (); } std::thread& data () noexcept { return t; } const std::thread& data () const noexcept { return t; } };
注意,如果使用移动赋值运算符和赋值运算符时,joining_thread
的私有成员 _t
有任务在运行,那么必须等待_t
的任务运行结束后才能将 other
的线程所属权转移给_t
,否则会造成崩溃。因为不能将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate
函数引发崩溃。
1 2 3 4 5 6 7 if (joinable ()) { join (); } t = std::move (other); return *this ;
1.2.1 如何使用 joining_thread 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void use_jointhread () { joining_thread j1 ([](int maxindex) { for (int i = 0 ; i < maxindex; i++) { std::cout << "in thread id " << std::this_thread::get_id() << " cur index is " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 )); } }, 10 ) ; joining_thread j2 (std::thread([](int maxindex) { for (int i = 0 ; i < maxindex; i++) { std::cout << "in thread id " << std::this_thread::get_id() << " cur index is " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 )); } }, 10 )) ; joining_thread j3 (std::thread([](int maxindex) { for (int i = 0 ; i < maxindex; i++) { std::cout << "in thread id " << std::this_thread::get_id() << " cur index is " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 )); } }, 10 )) ; j1 = std::move (j3); }
线程变量 j1 通过构造函数2构造:
1 2 3 4 5 6 7 8 9 10 11 joining_thread j1 ([](int maxindex) { for (int i = 0 ; i < maxindex; i++) { std::cout << "in thread id " << std::this_thread::get_id() << " cur index is " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 )); } }, 10 ) ;template <typename Callable, typename ... Args>explicit joining_thread (Callable&& func, Args&&...args) : t{ std::forward<Callable>(func), std::forward<Args>(args)... } {}
线程变量 j2 通过构造函数3构造:
首先通过std::thread 构造一个thread对象,然后将thread传入joining_thread 的构造函数中
1 2 3 4 5 6 7 8 9 joining_thread j2 (std::thread([](int maxindex) { for (int i = 0 ; i < maxindex; i++) { std::cout << "in thread id " << std::this_thread::get_id() << " cur index is " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 )); } }, 10 )) ;explicit joining_thread (std::thread t_) noexcept : t{ std::move (t_) } {}
同理,j3的构造方式和j2相同,如果将j3以移动语义赋值给j1,那么j1内部首先会等待j1管理的线程任务结束后,才会将j3的线程所属权转移给j1。使用了移动赋值运算符。
1 2 3 4 5 6 7 8 9 10 11 12 j1 = std::move (j3); joining_thread& operator =(std::thread&& other)noexcept { if (joinable ()) { join (); } t = std::move (other); return *this ; }
1.3 std::jthread std::jthread 相比于 C++11 引入的 std::thread ,只是多了两个功能:
RAII 管理 :在析构时自动调用 join()。
线程停止功能 :线程的取消/停止。
RAII技术已经说过很多次了,这里就不在叙述,可以参考前面的文章。这里主要说一下什么是线程停止功能。
1.3.1 零开销原则
为什么 C++20 不直接为std::thread增加这两个功能,而是创造一个新的线程类型呢?
这就是 C++ 的设计哲学,零开销原则: 你不需要为你没有用到的(特性)付出额外的开销。
std::jthread 的通常实现就是单纯的保有 std::thread
+ std::stop_source
这两个数据成员(joining_thread
只有一个成员std::thread):
1 2 thread _Impl; stop_source _Ssource;
stop_source 通常占 8 字节,先前 std::thread 源码解析详细聊过其不同标准库对其保有的成员不同(内存对齐 ),简单来说也就是 64 位环境,大小为 16 或者 8。也就是 sizeof(std::jthread) 的值相比 std::thread 会多 8 ,为 24 或 16。
引入 std::jthread 符合零开销原则,它通过创建新的类型提供了更多的功能,而没有影响到原来 std::thread 的性能和内存占用。
1.3.2 线程停止 首先要明确,C++ 的 std::jthread 提供的线程停止功能并不同于常见的 POSIX 函数 ***pthread_cancel 。 pthread_cancel * 是一种发送取消请求的函数,但并不是强制性的线程终止方式。目标线程的可取消性状态和类型决定了取消何时生效。当取消被执行时,进行清理和终止线程。
std::jthread
所谓的线程停止只是一种基于用户代码的控制机制 ,而不是一种与操作系统系统有关系的线程终止。使用 std::stop_source
和std::stop_token
提供了一种优雅地请求线程停止的方式,但实际上停止的决定和实现都由用户代码来完成 。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 using namespace std::literals::chrono_literals;void f (std::stop_token stop_token, int value) { while (!stop_token.stop_requested ()){ std::cout << value++ << ' ' << std::flush; std::this_thread::sleep_for (200 ms); } std::cout << std::endl; } int main () { std::jthread thread{ f, 1 }; std::this_thread::sleep_for (3 s); }
该段代码主要用于创建一个可以响应停止请求的线程。
当 std::jthread
对象超出作用域时,它会自动调用 request_stop()
请求停止线程,并在销毁时调用 join()
等待线程结束。
std::stop_token
允许线程检查是否接收到停止请求。在函数 f 中,循环体检查 stop_token.stop_requested()
,如果返回 false,则继续执行;否则退出循环。
在每次循环中,打印当前值并将其递增,然后线程休眠 200 毫秒。这样,每个数字的打印之间有一定的间隔。
在 main 函数中,主线程休眠 3 秒。这段时间内,f 函数将打印数字(大约会打印 15 个数字,因为 3 秒内会输出 1 到 15)。主线程结束后,jthread
会自动请求停止并等待 f 函数完成。
std::jthread
提供了三个成员函数进行所谓的线程停止:
get_stop_source
:返回与 jthread
对象关联的 std::stop_source
,允许从外部请求线程停止。
get_stop_token
:返回与 jthread
对象停止状态关联的 std::stop_token
,允许检查是否有停止请求。
request_stop
:请求线程停止。
上面那段代码中,这三个函数并没有被显式调用,不过在 jthread
的析构函数 中,会调用 request_stop
请求线程停止:
1 2 3 4 5 6 7 8 9 void _Try_cancel_and_join() noexcept { if (_Impl.joinable ()) { _Ssource.request_stop (); _Impl.join (); } } ~jthread () { _Try_cancel_and_join(); }
至于 std::jthread thread{ f, 1 }
函数 f 的 std::stop_token
的形参是谁传递的?其实就是线程对象自己调用get_token()
传递的 ,源码一眼便可发现:
1 2 3 4 5 6 7 8 template <class _Fn , class ... _Args, enable_if_t <!is_same_v<remove_cvref_t <_Fn>, jthread>, int > = 0 >_NODISCARD_CTOR_JTHREAD explicit jthread (_Fn&& _Fx, _Args&&... _Ax) { if constexpr (is_invocable_v<decay_t <_Fn>, stop_token, decay_t <_Args>...>) { _Impl._Start(_STD forward<_Fn>(_Fx), _Ssource.get_token (), _STD forward<_Args>(_Ax)...); } else { _Impl._Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...); } }
std::stop_source:
这是一个可以发出停止请求的类型。当你调用 stop_source
的 request_stop()
方法时,它会设置内部的停止状态为“已请求停止”。
任何持有与这个 stop_source
关联的 std::stop_token
对象都能检查到这个停止请求。
std::stop_token:
这是一个可以检查停止请求的类型。线程内部可以定期检查 stop_token
是否收到了停止请求。
通过调用 stop_token.stop_requested()
,线程可以检测到停止状态是否已被设置为“已请求停止”。
1.4 容器管理线程对象 之前在网络编程封装两种线程池就是通过容器管理线程对象,可以参考:
网络编程(16)——asio多线程模型IOServicePool - 知乎
网络编程(17)——asio多线程模型IOThreadPool - 知乎
容器存储线程时,比如 vector,如果用 push_back 操作势必会调用std::thread,这样会引发编译错误。
如果存在一个容器 std::vectorstd::thread ,当使用 push_back 将一个线程对象添加到这个容器时,实际上会调用 std::thread 的拷贝构造函数,但是std::thread 将拷贝构造函数delete,编译器会报错。例如
1 2 std::vector<std::thread> threads; threads.push_back (std::thread ([] { }));
一般来说,push_back会根据传入值的类型,选择使用拷贝构造函数或者移动构造函数。而 std::thread 存在移动构造函数,如果push_back不能使用thread的拷贝构造,那为什么不使用移动构造函数呢?
因为我们调用push_back 时,传递的对象通常是一个左值 (即命名的变量)。如果直接传递左值,编译器会优先尝试调用拷贝构造函数,如果不存在拷贝构造那么会直接报错,而不会调用移动构造,因为thread内部并不会将左值隐式转换为右值,从而调用移动构造。
push_back 会尝试使用传递的对象调用拷贝构造函数(左值传入 )。如果对象的拷贝构造函数不可用(例如被删除),则编译器会报错。
如果对象的拷贝构造函数可用,但移动构造函数也存在,push_back 会选择合适的构造函数:
如果传递的是一个左值,会调用拷贝构造函数。
如果传递的是一个右值,会调用移动构造函数。
而emplace_back 允许我们直接在容器中构造对象,而不需要先构造一个对象再传递给容器。这是通过完美转发实现的。当我们使用 emplace_back 时,std::thread 对象会在 std::vector 的内部直接构造,因此不会触发拷贝构造,这使得它可以接受可调用对象(如函数指针或 lambda 表达式)和参数来创建线程。
1 2 std::vector<std::thread> threads; threads.emplace_back (some_function);
我们在之前网络编程实现IOServicePool或者IOThreadPool时初始化了多个线程存储在vector中, 采用的时emplace_back方式,可以直接根据线程构造函数需要的参数构造,这样就避免了调用thread的拷贝构造函数。
std::thread 对象在被创建时会关联到一个实际的线程。如果 std::thread 对象被移动到(不是复制) std::vector 中,原始的 std::thread 对象将失去对该线程的所有权(即该对象变为无效),并且只能在 std::vector 中的元素上调用 join() 或 detach() 来管理这个线程的生命周期。
1.4.1 使用容器 使用容器管理线程对象,等待线程执行结束 :
1 2 3 4 5 6 7 8 9 10 11 12 13 void do_work (std::size_t id) { std::cout << id << '\n' ; } int main () { std::vector<std::thread>threads; for (std::size_t i = 0 ; i < 10 ; ++i){ threads.emplace_back (do_work, i); } for (auto & thread:threads){ thread.join (); } }
线程对象代表了线程,管理线程对象也就是管理线程,这个 vector 对象管理 10 个线程,保证他们的执行、退出。
使用我们这节实现的 joining_thread
则不需要最后的循环 join()
,joining_thread
对象在析构时会自动调用join()
1 2 3 4 5 6 int main () { std::vector<joining_thread>threads; for (std::size_t i = 0 ; i < 10 ; ++i){ threads.emplace_back (do_work, i); } }
注意到,这两段代码的输出都是乱序的,没有规律 ,而且重复运行的结果还不一样,这是正常现象 。多线程执行就是如此,无序且操作可能被打断。使用互斥量可以解决这些问题,也就是下一章节的内容。
1.4.2 如何选择线程运行数量 在网络编程的学习中,我们通过 std::thread::hardware_concurrency()
函数定义线程运行的数量,该函数的作用就是返回当前设备CPU的核数,以核数作为线程的运行数量,希望CPU一个核中运行一个线程。
接下来举一个例子,来说明多线程并行处理的优势。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 template <typename Iterator, typename T>struct accumulate_block { void operator () (Iterator first, Iterator last, T& result) { result = std::accumulate (first, last, result); } }; template <typename Iterator, typename T>T parallel_accumulate (Iterator first, Iterator last, T init) { unsigned long const length = std::distance (first, last); if (!length) return init; unsigned long const min_per_thread = 25 ; unsigned long const max_threads = (length + min_per_thread - 1 ) / min_per_thread; unsigned long const hardware_threads = std::thread::hardware_concurrency (); unsigned long const num_threads = std::min (hardware_threads != 0 ? hardware_threads : 2 , max_threads); unsigned long const block_size = length / num_threads; std::vector<T> results (num_threads) ; std::vector<std::thread> threads (num_threads - 1 ) ; Iterator block_start = first; for (unsigned long i = 0 ; i < (num_threads - 1 ); ++i) { Iterator block_end = block_start; std::advance (block_end, block_size); threads[i] = std::thread ( accumulate_block <Iterator, T>(), block_start, block_end, std::ref (results[i])); block_start = block_end; } accumulate_block <Iterator, T>()( block_start, last, results[num_threads - 1 ]); for (auto & entry : threads) entry.join (); return std::accumulate (results.begin (), results.end (), init); }
在上段代码中,实现了一个函数模板 parallel_accumulate
,该函数通过多线程来加速对区间[first, last)
中元素的累加。下面是对这个函数的解释:
1 2 3 4 5 6 7 8 template <typename Iterator, typename T>struct accumulate_block { void operator () (Iterator first, Iterator last, T& result) { result = std::accumulate (first, last, result); } };
accumulate_block
是一个仿函数,参数为两个迭代器,以及结果存储的对象。
1 2 template <typename Iterator, typename T>T parallel_accumulate (Iterator first, Iterator last, T init)
parallel_accumulate 是一个模板函数,接受两个迭代器 first 和 last 以及一个初始值 init。返回值类型为 T,表示累加的结果。接下来解析函数执行步骤:
①计算长度
1 2 3 unsigned long const length = std::distance (first, last);if (!length) return init;
计算区间的长度,如果长度为 0,直接返回初始值,确保区间内有元素。
②确定线程数
1 2 3 4 5 6 7 unsigned long const min_per_thread = 25 ;unsigned long const max_threads = (length + min_per_thread - 1 ) / min_per_thread; unsigned long const hardware_threads = std::thread::hardware_concurrency (); unsigned long const num_threads = std::min (hardware_threads != 0 ? hardware_threads : 2 , max_threads);
设定每个线程处理的最小元素数 min_per_thread,然后计算最大线程数 max_threads
获取当前硬件支持的最大线程数(CPU核数),并选择实际使用的线程数 num_threads(理论最大线程数和硬件支持最大线程数的最小值)
③计算区块大小
1 2 3 unsigned long const block_size = length / num_threads; std::vector<T> results (num_threads) ;std::vector<std::thread> threads (num_threads - 1 ) ;
计算每个线程应该处理的区块大小,并创建结果存储的 results 向量和线程对象的向量
results 是一个大小为实际线程数num_threads的容器,用于记录每个线程的求解值
我们也使用vector对每个线程进行管理
存储线程的容器threads的大小并不是num_threads ,而是num_threads -1,因为主线程也会帮忙计算,如果主线程需要执行其他功能,那么threads的大小需要设为num_threads
④启动线程
1 2 3 4 5 6 7 8 9 10 Iterator block_start = first; for (unsigned long i = 0 ; i < (num_threads - 1 ); ++i){ Iterator block_end = block_start; std::advance (block_end, block_size); threads[i] = std::thread ( accumulate_block <Iterator, T>(), block_start, block_end, std::ref (results[i])); block_start = block_end; }
在循环中,将区间划分为多个区块,为每个区块创建线程并开始执行 accumulate_block 函数,每个线程都负责一个区块。注意,这里启动的线程数不是 num_threads,而是num_threads-1,因为最后一个区块的长度可能不是block_size,所以我们需要单独对这个区块进行处理。我们传入的参数是区间的范围以及需要存储在哪里
1 2 3 threads[i] = std::thread ( accumulate_block <Iterator, T>(), block_start, block_end, std::ref (results[i]));
std::advance用于将迭代器每次都向前移动block_size个大小,block_size是每个线程处理的长度
1 2 3 4 5 6 7 8 9 10 std::vector<int > vec = {1 , 2 , 3 , 4 , 5 }; auto it = vec.begin ();std::advance (it, 2 ); std::cout << *it << std::endl; std::advance (it, -1 ); std::cout << *it << std::endl;
⑤处理最后一个区块
1 2 accumulate_block <Iterator, T>()( block_start, last, results[num_threads - 1 ]);
对最后一个区块单独处理,确保所有元素都被累加。
⑥等待线程结束
1 2 for (auto & entry : threads) entry.join ();
⑦返回结果
1 return std::accumulate (results.begin (), results.end (), init);
最后,我们调用这个函数进行测试
1 2 3 4 5 6 7 8 9 10 void use_parallel_acc () { std::vector <int > vec (10000 ); for (int i = 0 ; i < 10000 ; i++) { vec.push_back (i); } int sum = 0 ; sum = parallel_accumulate<std::vector<int >::iterator, int >(vec.begin (), vec.end (), sum); std::cout << "sum is " << sum << std::endl; }
输出结果为:
1.5 线程id 还记得么,thread唯一的私有成员变量是一个结构体,该结构体中存储着线程的id,我们可以根据线程id判断不同线程是否是同一个线程。
1 2 3 4 std::thread t ([](){ std::cout << "thread start" << std::endl; }) ;t.get_id ();
通过 get_id() 函数,我们可以获取指定线程变量的线程id。
但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这个方式
1 2 3 4 std::thread t ([](){ std::cout << "in thread id " << std::this_thread::get_id() << std::endl; std::cout << "thread start" << std::endl; }) ;
在传入thread的lambda函数中,通过 std::this_thread::get_id() 可以获取当前线程的唯一标识符(ID)。
std::this_thread 是 C++ 标准库中的一个命名空间,提供了一组与当前线程相关的功能。主要用于获取当前线程的信息和管理线程的执行 。