网络编程(6)——发送的时序性,全双工通信
2025-04-30 12:24:32 # C++ # 网络编程

六、day6

在上午学习完如何通过c++11特性模拟伪闭包实现连接的安全回收之后,下午学习如何封装一个发送接口,该接口能保证发送的时序性异步发送时TCP底层缓冲区可能无法将所有数据一次发出去,如果这时候再次调用异步发送,就可能造成数据错乱,比如发送两个Hello World,如果是同步,那么数据是有序的,但如果是异步,那么可能会导致在TCp存储的数据是Hello Hello World World,这时候需要我们构造一个消息队列,将需要发送的数据插入队列,下一个消息按序排列到上一个消息后面,只有当上一个消息发送完毕,下一个消息才会从消息队列中pop出发送)。实现的关键在于:多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的(队列)。

文章开始前将前面文章中提到的Server和Session类分成CServer.h和CSession.h两个文件,注意两个文件的依赖关系:

CSession.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <map>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

using boost::asio::ip::tcp;
using std::cout;
using std::cin;
using std::endl;

class CServer;

class MsgNode {
public:
int _total_len; // 数据的总长度
int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)
char* _msg; // 数据域首地址

MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点
_msg = new char[total_len];
memcpy(_msg, msg, total_len);
}
MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点
_msg = new char[total_len];
}
~MsgNode() {
delete[] _msg;
}
};

class CSession:public std::enable_shared_from_this<CSession>
{
private:
tcp::socket _socket; // 处理客户端读写的套接字
enum { max_length = 1024 };
char _data[max_length];

// headle回调函数
void headle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared);
void haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);
std::string _uuid;
CServer* _server;
public:
CSession(boost::asio::io_context& ioc, CServer* server) : _socket(ioc), _server(server){
// random_generator是函数对象,加()就是函数,再加一个()就是调用该函数
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
}
tcp::socket& Socket() { return _socket; }
const std::string& GetUuid() const { return _uuid; }
void Start();
};

CSession.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "CSession.h"
#include "CServer.h"

void CSession::Start() {
memset(_data, 0, max_length); // 缓冲区清零
// 从套接字中读取数据,并绑定回调函数headle_read
_socket.async_read_some(boost::asio::buffer(_data, max_length),
// 这里可以将shared_ptr<Session>(this)给bind绑定吗?
// 不可以,会造成多个智能指针绑定同一块内存的问题
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,
shared_from_this()));
}

//
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {
cout << "server receive data is " << _data << endl;
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred),
std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));
}
else {
cout << "read error" << endl;
_server->ClearSession(_uuid);
}
}

void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {
if (!error) {
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else {
cout << "write error" << error.value() << endl;
_server->ClearSession(_uuid);
}
}

CServer.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#pragma once
#include "CSession.h"

class CServer
{
private:
void start_accept(); // 启动一个acceptor
// 当acceptor接收到连接后启动该函数
void handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error);
boost::asio::io_context& _ioc;
tcp::acceptor _acceptor;
std::map<std::string, std::shared_ptr<CSession>> _sessions;
public:
CServer(boost::asio::io_context& ioc, short port);
void ClearSession(std::string uuid);
};

CServer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "CServer.h"


// 初始化服务器对象,绑定 I/O 上下文和监听的端口,并启动服务器
CServer::CServer(boost::asio::io_context& ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
cout << "Server start success, on port: " << port << endl;
// 开始异步地接受客户端连接请求。服务器启动后就进入等待客户端连接的状态
start_accept();
}

void CServer::ClearSession(std::string uuid) {
_sessions.erase(uuid);
}

void CServer::start_accept() {
// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数
std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this);
// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept
// 为什么new_session在右括号结束后仍不结束,而是bind后计数加一?
// new_session通过bind绑定时,new_session的计数就会加一,所以在bind后,new_session的生命周期和
// 新构造函数的生命周期相同,因为新生成的函数对象引用了new_session(new_session通过值传递的方式被复制构造函数使用)。
// 所以只要新构造的bind回调函数没有被调用、移除,new_session的声明周期就始终存在,所以new_session不会随着'}'的结束而释放。
_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,
std::placeholders::_1));
}

// 当handle_accept触发时,也就是start_accept的回调函数被触发,当该回调函数结束后从队列中移除后,new_session的引用计数减一
void CServer::handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error) {
// 如果没有错误(error 为 false),调用 new_session->Start() 来启动与旧客户端的会话
if (!error) {
new_session->Start();
_sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
}
else cout << "session accept failed, error is " << error.what() << endl;
// 无论当前连接是否成功,都重新调用 start_accept(),以便服务器能够继续接受下一个新客户端的连接请求。
// 服务器始终保持在监听状态,随时准备接受新连接
start_accept();
}

1)数据节点设计

首先,使用网络编程(3)中的数据节点,作为异步服务器数据的存储节点,放在CSession.h文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MsgNode {
public:
int _total_len; // 数据的总长度
int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)
char* _msg; // 数据域首地址

MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点
_msg = new char[total_len];
memcpy(_msg, msg, total_len);
}
MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点
_msg = new char[total_len];
}
~MsgNode() {
delete[] _msg;
}
};

2)封装发送接口

服务器的发送接口一般是在逻辑线程调用,所以调用发送线程的接口和asio回调的网络线程不在一个线程,这个发送队列就存在两个线程的共同访问,所以需增加一个保证发送队列的安全性,同时新增一个发送接口Send

1
2
3
void Send(char* msg,  int max_length);
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;

以及send的实现:

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。

这个函数确保了在多线程环境下,发送操作的有序性与安全性。通过锁来保护发送队列,通过队列来缓存多个待发送的消息,并使用异步写操作 async_write 进行非阻塞的发送。函数还确保了只有一个异步写操作会在某一时刻进行,避免同时多次发送操作对同一套接字的竞争访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void CSession::Send(char* msg, int max_length) {
bool pending = false; // 发送标志,true时有未完成的发送操作,false为空
// 使用lock_guard锁住_send_lock,确保_send_lock(发送队列)的访问的线程安全的
// 锁的存在确保了多个线程不会同时修改发送队列
std::lock_guard<std::mutex> lock(_send_lock);
// 判断队列是否有未完成的发送操作
if (_send_que.size() > 0) {
pending = true;
}
_send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 将发送消息存储至队列
if (pending) { // 如果有未完成的发送,直接返回
return;
}
// 异步发送
boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),
std::bind(&CSession::haddle_write, this, std::placeholders::_1, shared_from_this()));
} // 当'}'结束后,_send_lock解锁,发送队列解锁

3)修改读和写回调

写回调(实现了异步写操作完成后的处理逻辑,在写入操作成功时从发送队列中移除已发送的数据,并继续处理队列中的下一个数据包;如果写入操作失败,则处理错误并清除会话):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 异步写操作完成后的回调处理函数
void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {
if (!error) { // 检查异步写是否成功
std::lock_guard<std::mutex> lock(_send_lock); // 加锁保护发送队列
_send_que.pop(); // 移除上一个已发送的消息(send函数中的异步发)
if (!_send_que.empty()) { // 若队列不为空,处理下一个消息
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_msg, msgnode->_total_len),
std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
_server->ClearSession(_uuid);
}
}

读回调:

因为服务器一般是全双工通信,所以要一直监听对端发送的数据,在每次收到数据后继续绑定监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {
cout << "server receive data is " << _data << endl;
Send(_data, bytes_transferred); // 将收到的消息回传
memset(_data, 0, max_length); // 缓冲区清零
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::headle_read, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
_server->ClearSession(_uuid);
}
}

1.服务器的发送接口一般是在逻辑线程调用,调用发送线程的接口和asio回调的网络线程不在一个线程,发送队列存在两个线程的共同访问,如何解释这句话?

1)逻辑线程

  • 服务器程序可能有多个线程执行不同的任务,其中有一个或多个线程专门负责业务逻辑(通常称为逻辑线程)。这些线程负责处理如游戏逻辑、业务处理等高层次的操作。
  • 发送数据的请求通常由这些逻辑线程发起,也就是说,逻辑线程会调用服务器的发送接口来准备或触发向客户端发送数据。

2)网络线程

  • 使用Boost.Asio这样的异步I/O库时,实际处理网络通信的部分是由网络线程(通常是由Boost.Asio提供的线程)来负责。这些线程处理所有的网络事件和I/O操作,比如读写操作完成时的回调函数。
  • 当逻辑线程调用发送接口时,实际的数据发送操作是交由网络线程处理的,因此存在逻辑线程网络线程之间的协作。这两个线程不是同一个线程,存在并发访问的问题。

3)发送队列

  • 为了实现异步发送,服务器通常会有一个发送队列,用于暂存即将发送的数据包。逻辑线程将数据放入这个队列中,而网络线程则从队列中取出数据并通过网络发送出去。
  • 因为这个发送队列是由两个不同的线程(逻辑线程和网络线程)共同访问的,因此会有并发问题。如果没有进行适当的同步控制(如加锁或使用无锁队列),可能会导致数据竞争(data race)、不一致或崩溃等问题。

2.std::lock_guard<std::mutex> lock(_send_lock)是如何保护发送队列的,什么时候解除保护?

std::lock_guard<std::mutex> 是一个类模板,它会在创建时锁住传递的互斥量(mutex),并在离开作用域时自动解锁。锁的保护通过以下机制实现:

1
std::lock_guard<std::mutex> lock(_send_lock);
  • 这行代码创建了一个 std::lock_guard 对象 lock,并将 _send_lock 传递给它,表示要锁定 _send_lock 互斥量。
  • 一旦 lock 对象被创建,构造函数会立即锁定 _send_lock,从而确保在该作用域内,其他线程无法同时访问受该锁保护的资源。
  • 锁定后,直到当前代码块结束前,其他线程无法获取 _send_lock,从而保证了临界区(即锁定代码之后的代码块)的线程安全。

std::lock_guard<std::mutex> 的锁定持续到该对象的生命周期结束。当 lock 对象超出其作用域时(即代码块结束时),它会自动调用其析构函数,从而释放互斥锁 _send_lock

3.锁的机制

锁是用于协调对共享资源(比如发送队列)的访问,确保在多线程环境中只有一个线程能够在某一时刻访问该资源,当一个线程在访问发送队列时:

1
2
std::lock_guard<std::mutex> lock(_send_lock);
_send_que.push(...); // 或者 _send_que.pop()
  • 通过 lock_guard 锁定 _send_lock,只有当前线程能进入这段代码,并操作 _send_que
  • 如果其他线程也想访问队列,它们会在获取 _send_lock 时被阻塞,直到当前线程释放锁。这就防止了多个线程同时修改队列的可能性。

举例:

1
2
3
4
5
void CSession::Send(char* msg, int max_length) {
std::lock_guard<std::mutex> lock(_send_lock); // 锁定互斥锁
_send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 对共享队列的操作
// 其他代码
}
  • 当线程A调用 Send() 函数并进入这段代码时,它加锁 _send_lock,防止其他线程B同时修改 _send_que。
  • 线程B调用 Send() 函数时,会发现 _send_lock 被线程A持有,线程B必须等待线程A释放锁后,才能获得锁并访问队列。

参考:

恋恋风辰官方博客

visual studio配置C++ boost库_哔哩哔哩_bilibili