网络编程(14)——基于单例模板实现的逻辑层
2025-06-23 13:38:46 # C++ # 网络编程

十四、day14

今天学习如何通过单例模板实现逻辑层

参考:

恋恋风辰官方博客

visual studio配置C++ boost库_哔哩哔哩_bilibili

1. 利用C++11特性封装单例模板

和上一节设计的单例模板有些不同,本节设计的单例模板利用了以下四个C++11新特性,优化了代码

  • unique_lock和lock_guard
  • once_flag和call_once
  • std::function
  • condition_variable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#pragma once
#include <memory>
#include <mutex>
#include <iostream>
using std::cout;
using std::cin;
using std::endl;

template <typename T>
class Singleton;

template<typename T>
class SafeDeletor
{
public:
void operator()(Singleton<T>* st)
{
cout << "this is safe deleter operator()" << endl;
delete st;
}
};

template <typename T>
class Singleton {
template<typename T>
friend class SafeDeletor;
protected:
~Singleton() {
cout << "this is singleton destruct" << endl;
}
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>&st) = delete;
private:
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag; // C++11新特性,懒汉模式在多线程下也不会生成多个实例,不用进行加锁操作
std::call_once(s_flag, [&]() { // call_once内部有锁
_instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
});
return _instance;
}

void PrintAddress() {
cout << _instance->get() << endl;
}
};

// 初始化
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;
  • 私有成员仅保留静态成员变量**_instance**,用于保存唯一实例,无需手动进行加锁(C++11新特性会自动实现)
  • s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstances_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。
  • call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。
  • PrintAddress()函数获取指向托管对象的原始指针

1)本节GetInstance()函数实现

1
2
3
4
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T, SafeDeletor<T>());
});
  • 使用 std::call_once:这个方法保证了指定的 lambda 函数只会被调用一次。无论有多少线程尝试访问 GetInstance,只有第一个线程会执行初始化代码,其他线程会等待。这种方法内部处理了线程同步,不需要手动加锁。
  • 懒汉式加载:确保在第一次调用时实例化,避免了不必要的开销。

2)上一节GetInstance()函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器
single = std::shared_ptr<T>(new T, SafeDeletor_T<T>());
s_mutex.unlock();
  • 手动加锁:使用 s_mutex.lock() 和 s_mutex.unlock(),需要手动管理锁。

以上四个C++11新特性的介绍请参考

C++11新特性 - 知乎 (zhihu.com)

2. 逻辑类LogicSystem

2.1 逻辑类的声明

首先定义一个逻辑类LogicSystem,并继承单例模板**Singleton<LogicSystem>**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#pragma once
#include "Singleton.h"
#include <queue>
#include <thread>
#include "CSession.h"
#include <map>
#include <functional>
#include "Const.h"
#include <json/json.h>
#include <json/reader.h>

typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;

class LogicSystem:public Singleton<LogicSystem>
{
// 因为Singleton<LogicSystem>中实例化make_ptr时,使用了new,需要用到LogicSystem的构造函数,但已被私有化,所以需要友元
friend class Singleton<LogicSystem>;
private:
LogicSystem();
void RegisterCallBacks();
void HelloWordCallBack(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data);
void DealMsg();

std::queue<std::shared_ptr<LogicNode>> _msg_que;
std::mutex _mutex;
std::condition_variable _consume;
std::thread _worker_thread;
bool _b_stop;
std::map<short, FunCallBack> _fun_callback;
public:
~LogicSystem();
void PostMsgToQue(std::shared_ptr<LogicNode> msg);
};
  • FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。
  • _msg_que为逻辑队列,其中的元素相当于RecvNode,只不过为了实现伪闭包,所以创建一个LogicNode类,包含Session的智能指针防止被提前释放
  • _mutex 为保证逻辑队列安全的互斥量
  • _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
  • _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。
  • _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。
  • _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

2.2 LogicNode类

LogicNode类定义在CSession.h中,详细代码可以查看之前的文章

1
2
3
4
5
6
7
8
class LogicNode {
friend class LogicSystem;
public:
LogicNode(std::shared_ptr<CSession>, std::shared_ptr<RecvNode>);
private:
std::shared_ptr<CSession> _session;
std::shared_ptr<RecvNode> _recvnode;
};
  • _session:声明Session的智能指针,实现伪闭包,防止Session被提前释放
  • _recvnode:接收消息节点的智能指针

构造函数如下:

1
2
LogicNode::LogicNode(std::shared_ptr<CSession> session, std::shared_ptr<RecvNode> recvnode) :
_session(session), _recvnode(recvnode){}

2.3 逻辑类的实现

a. 逻辑类的构造函数

1
2
3
4
LogicSystem::LogicSystem() :_b_stop(false) {
RegisterCallBacks();
_worker_thread = std::thread(&LogicSystem::DealMsg, this);
}
  • _b_stop初始化为false,当前工作线程不会停止
  • RegisterCallBacks():调用回调注册函数:这个函数用于注册消息处理的回调函数。将不同的消息 ID 和对应的处理函数关联起来,以便在处理消息时能够找到正确的函数。
  • _worker_thread = std::thread(&LogicSystem::DealMsg, this);
    • 创建工作线程:使用 std::thread 创建一个新的线程。
    • &LogicSystem::DealMsg:指定要在新线程中执行的成员函数,即 DealMsg,该函数将负责从消息队列中提取消息并处理它。
    • this:传递当前对象的指针,以便 DealMsg 可以访问 LogicSystem 的成员变量和函数。

b. RegisterCallBacks(), 注册消息处理的回调函数

1
2
3
4
void LogicSystem::RegisterCallBacks() {
_fun_callback[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

消息ID:MSG_HELLO_WORD映射至回调函数HelloWordCallBack,并存储至_fun_callback。当接收到消息MSG_HELLO_WORD时,系统会调用 HelloWordCallBack 方法来处理消息。

_fun_callback的定义如下:

1
2
3
typedef std::function<void(std::shared_ptr<CSession>, const short& msg_id, const std::string& msg_data)> FunCallBack;

std::map<short, FunCallBack> _fun_callback;

_fun_callback是一个map,值和键的类型分别为short和FunCallBack,前者是消息id的类型,后者是回调函数。

FunCallBack使用了C++11新特性std::function来声明了一个函数签名,表示接受一个 shared_ptr 指向 CSession、一个 short 类型的消息 ID 和一个 std::string 类型的消息数据的可调用对象

c. HelloWordCallBack, 处理对应消息类型的回调函数

1
2
3
4
5
6
7
8
9
10
11
void LogicSystem::HelloWordCallBack(std::shared_ptr<CSession> session, const short& msg_id, const std::string& msg_data) {
Json::Reader reader;
Json::Value root;
reader.parse(msg_data, root);
std::cout << "receive msg id is " << root["id"].asInt() << "msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has receive msg, msg data is " + root["data"].asString();

std::string return_str = root.toStyledString();
session->Send(return_str, root["id"].asInt());
}
  • 使用jsoncpp库包装或者解析数据
  • 调用 parse 方法将 msg_data 字符串解析为 JSON 格式,结果存储在 root 中
  • 回传消息

d. DealMsg, 处理逻辑队列中的消息流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void LogicSystem::DealMsg() {
for (;;) {
// 和lock_guard的区别,这里是对_mutex进行加锁
std::unique_lock<std::mutex> unique_lk(_mutex);

// 若队列为空,用条件变量等待
while (_msg_que.empty() and !_b_stop) {
_consume.wait(unique_lk);
}

// 判断如果为关闭状态,取出逻辑队列所有数据处理并退出循环
if (_b_stop) {
while (!_msg_que.empty()) {
auto msg_node = _msg_que.front();
cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;
auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID());
if (call_back_iter == _fun_callback.end()) {
_msg_que.pop();
continue;
}
call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
break;
}

// 如果没有停服,且队列不为空
auto msg_node = _msg_que.front();
cout << "recv msg id is " << msg_node->_recvnode->GetMsgID() << endl;

auto call_back_iter = _fun_callback.find(msg_node->_recvnode->GetMsgID());
// 注册_fun_callback时,将消息id以及匹配的回调函数绑定在一起
// 若队列中没有该消息对应的回调函数,则释放该消息并处理下一条
if (call_back_iter == _fun_callback.end()) {
_msg_que.pop();
continue;
}
// 执行消息类型对应的回调函数
call_back_iter->second(msg_node->_session, msg_node->_recvnode->GetMsgID(),
std::string(msg_node->_recvnode->_msg, msg_node->_recvnode->_cur_len));
_msg_que.pop();
}
}
  • DealMsg 逻辑中初始化了一个unique_lock,主要用于控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。
  • 若队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。

e. 析构函数

1
2
3
4
5
LogicSystem::~LogicSystem() {
_b_stop = true;
_consume.notify_one();
_worker_thread.join();
}

逻辑类的析构函数在所有工作线程运行结束后被执行,但工作线程可能会处于挂起状态,此时需要一个激活信号打断**_consumewait**状态(在该命令前一步将_b_stop置为true)。

f. PostMsgToQue, 添加消息至逻辑队列

1
2
3
4
5
6
7
8
void LogicSystem::PostMsgToQue(std::shared_ptr<LogicNode> msg) {
std::unique_lock<std::mutex> unique_lk(_mutex); // 锁定互斥量以保护共享数据
_msg_que.push(msg); // 将消息推入队列

if (_msg_que.size() == 1) { // 如果这是队列中的第一个消息
_consume.notify_one(); // 通知消费者线程,结束wait状态
}
}

3. 修改CSession类的headle_read函数

将函数中原本对消息的处理过程(cout读到的消息并回传)删去,改为将消息投至逻辑队列

1
LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

完整函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
void CSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {

// 打印缓存区的数据并将该线程暂停2s
//PrintRecvData(_data, bytes_transferred);
//std::chrono::milliseconds dura(2000);
//std::this_thread::sleep_for(dura);

// 每触发一次handale_read,它会返回实际读取的字节数bytes_transferred,copy_len表示已处理的长度,每处理一字节,copy_len便加一
int copy_len = 0; // 已经处理的字符数
while (bytes_transferred > 0) { // 只要读取到数据就对其处理
if (!_b_head_parse) { // 判断消息头部是否已处理,_b_head_parse默认为false
// 异步读取到的字节数 + 已接收到的头部长度 < 头部总长度
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) { // 收到的数据长度小于头部长度,说明头部还未全部读取
// 如果未完全接收消息头,则将接收到的数据复制到头部缓冲区
// _recv_head_node->_msg,更新当前头部的接收长度,并继续异步读取剩余数据。
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
// 缓冲区清零,无需更新copy_len追踪已处理的字符数,因为之前读取的数据已经全部写入头部节点,下一个
// 读入的消息从头开始(copy_len=0)往头节点写
::memset(_data, 0, MAX_LENGTH);
// 继续读消息
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::headle_read, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

// 如果接收到的数据量足够处理消息头部,则计算头部剩余的未接收字节,
// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node->_msg
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; // 头部剩余未复制的长度
// 填充头部节点
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
copy_len += head_remain; // 更新已处理的data长度
bytes_transferred -= head_remain; // 更新剩余未处理的长度

short msg_id = 0; // 获取消息id
memcpy(&msg_id, _recv_head_node->_msg, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
cout << "msg_id is " << msg_id << endl;
// 判断id是否合法
if (msg_id > MAX_LENGTH) {
std::cout << "invaild msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}

short msg_len = 0; // 获取头部数据(消息长度)
memcpy(&msg_len, _recv_head_node->_msg + HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
cout << "msg_len is " << msg_len << endl;

if (msg_len > MAX_LENGTH) { // 判断头部长度是否非法
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}

_recv_msg_node = std::make_shared<RecvNode>(msg_len, msg_id); // 已知数据长度msg_len,构建消息内容载体
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
// copy_len不用更新,缓冲区会清零,下一个读入data的数据从头开始写入,copy_len也会被初始化为0
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));

_b_head_parse = true; //头部处理完成
return;
}

// 接收的长度多于消息内容长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
// cout << "receive data is " << _recv_msg_node->_msg << endl;

// protobuf序列化
//MsgData msgdata;
//std::string receive_data;
//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
//std::cout << "receive msg id is " << msgdata.id () << " msg data is " << msgdata.data() << endl;
//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
//MsgData msgreturn;
//msgreturn.set_id(msgdata.id());
//msgreturn.set_data(return_str);
//msgreturn.SerializeToString(&return_str);
//Send(return_str);

// jsoncpp序列化
//Json::Reader reader;
//Json::Value root;
//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
//std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
// << root["data"].asString() << endl;
//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
//std::string return_str = root.toStyledString();
//Send(return_str, root["id"].asInt());

LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len); // 回传
// 清理已处理的头部消息并重置,准备解析下一条消息
_b_head_parse = false;
_recv_head_node->Clear();

// 如果当前数据已经全部处理完,重置缓冲区 _data,并继续异步读取新的数据
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

continue; // 异步读取的消息未处理完,继续填充头节点乃至新的消息节点
}

//已经处理完头部,处理上次未接受完的消息数据
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) { //接收的数据仍不足剩余未处理的
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 接收的数据多于剩余未处理的长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_msg << endl;

// protobuf序列化
//MsgData msgdata;
//std::string receive_data;
//msgdata.ParseFromString(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len));
//std::cout << "receive msg id is " << msgdata.id() << " msg data is " << msgdata.data() << endl;
//std::string return_str = "Server has received msg, msg data is " + msgdata.data();
//MsgData msgreturn;
//msgreturn.set_id(msgdata.id());
//msgreturn.set_data(return_str);
//msgreturn.SerializeToString(&return_str);
//Send(return_str);

//jsoncpp序列化
//Json::Reader reader;
//Json::Value root;
//reader.parse(std::string(_recv_msg_node->_msg, _recv_msg_node->_total_len), root);
//std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
// << root["data"].asString() << endl;
//root["data"] = "Server has received msg, msg data is " + root["data"].asString();
//std::string return_str = root.toStyledString();
//Send(return_str, root["id"].asInt());

LogicSystem::GetInstance()->PostMsgToQue(std::make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

//此处可以调用Send发送测试
//Send(_recv_msg_node->_msg, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}

4. 测试

运行服务器和客户端,测试结果如下

img

img