网络编程(7)——粘包问题
2025-04-30 12:24:32 # C++ # 网络编程

七、day7

今天学习如何处理服务器粘包问题。

参考:

恋恋风辰官方博客

visual studio配置C++ boost库_哔哩哔哩_bilibili

粘包问题

当客户端连续发送多个数据包时,服务器底层tcp接受缓冲区收到的数据是连粘在一起的,比如

img

连粘数据

粘包原因

1)socket收发原理

发送数据并不是直接发送给对端, 而是应用程序将数据发送到本机操作系统的缓存里边, 当数据量小, 发送的时间间隔短, 操作系统就会在缓存区先攒够一个TCP段再通过网卡一起发送, 接收数据也是一样的, 先在操作系统的缓存存着, 然后应用程序再从操作系统中取出数据。

img

socket收发原理

2)粘包原因

  • 客户端的发送频率远高于服务器的接收频率,接收方没有及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)recv会产生黏包(如果recv接受的数据量(1024)小于发送的数据量,第一次只能接收规定的数据量1024,第二次接收剩余的数据量)
  • 发送端需要等缓冲区满才发送出去,造成粘包(tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,发送数据时间间隔很短,数据也很小,会合到一起,产生粘包)send 也可能发生粘包现象。(连续send少量的数据发到输出缓冲区,由于缓冲区的机制,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络

3)如何处理粘包

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容),如下图

img

本节对其做了简化,消息格式只保留消息长度和消息内容,后续加消息id。


1)完善消息节点

新增两个变量表示数据最大长度和数据包头部的大小,消息长度用两个字节存储,最大存储长度为1024*2字节

1
2
const size_t MAX_LENGTH = 1024 * 2;
const size_t HEAD_LENGTH = 2;
  • 两个参数的构造函数做了完善,之前的构造函数通过消息首地址和长度构造节点数据,现在需要在构造节点的同时把长度信息也写入节点,该构造函数主要用来发送数据时构造发送信息的节点。
  • 一个参数的构造函数为较上次新增的,主要根据消息的长度构造消息节点,该构造函数主要是接收对端数据时构造接收节点调用的。
  • 新增一个Clear函数清除消息节点的数据,主要是避免多次构造节点造成开销。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MsgNode {
public:
int _total_len; // 数据的总长度
int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)
char* _msg; // 数据域首地址
// 用HEAD_LENGTH两个字节的内存存储消息长度,消息总长度为HEAD_LENGTH + total_len
MsgNode(const char* msg, int total_len) :_total_len(total_len + HEAD_LENGTH), _cur_len(0) { // 构造写节点
_msg = new char[total_len + 1]; // c风格字符串得留一字节保存'\0'
// 将消息体的长度存储至_msg中,共占两个字节,&total_len是消息体长度的地址,比如total_len=32,就将32存储至_msg中
memcpy(_msg, &total_len, HEAD_LENGTH);
memcpy(_msg + HEAD_LENGTH, msg, total_len); // 消息内容从数组首地址偏移两个字节开始存储,前两个字节是消息长度
_msg[_total_len] = '\0';
}
MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点
_msg = new char[total_len + 1];
}
~MsgNode() {
delete[] _msg;
}
void Clear() {
::memset(_msg, 0, _total_len);
_cur_len = 0;
}
};

2)CSession类完善

为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构。

1
2
3
std::shared_ptr<MsgNode> _recv_msg_node; // 存储接受的消息体信息
bool _b_head_parse; // 表示是否处理完头部信息
std::shared_ptr<MsgNode> _recv_head_node; // 存储接收的头部信息

3)完善接收逻辑

处理头部节点

copy_len 是用来追踪已经写入缓冲区_data字符的处理情况,如果读入_data的字符长度bytes_transferred小于头结点的长度或者大于头结点的长度但是小于头结点中包含的消息总长度,那么这种情况下,_data缓冲区的数据都会被写入头结点或者头结点和消息结点(当填满头结点后,copy_len 需要更新,追踪缓冲区剩余的数据位置,如果剩余的长度小于头结点带的消息总长度数据,那么下一步填充消息结点时无需更新copy_len )。此时,无需更新copy_len ,因为消息未读全的情况下,会继续执行异步读操作,在新的异步读中,copy_len 会被初始化为0,而剩余未被读入的数据会被逐步读入并写入缓冲区,缓冲区的数据从头开始写入对应的头结点或者消息结点,无需copy_len 追踪剩余情况。

如果写入缓冲区的长度bytes_transferred大于头结点,那么需要更新copy_len ,此时_data的数据会被指向排除头结点(2字节)后的数据位置,并将从此开始的消息传入消息结点_recv_msg_node

如果读入缓冲区的数据长度大于头节点携带的消息总长度,那么更新copy_len,重新进入while循环。此时,会将处理完上一个消息头和消息体之后剩余的数据存入新的消息头和新的消息体,此时,需要copy_len指定再_data缓冲区中剩余数据的位置。

总结,仅有当收到的消息(存入缓冲区_data)的长度小于头结点长度(2字节)时,或者收到的消息长度大于头结点,此时填充头结点并更新copy_len,让copy_len指向2字节之后的内容,如果剩余的长度小于头结点要求的消息总长度,那么不更新copy_len,清空_data,继续执行异步读。只有执行异步读时不需要更新copy_len(说明读的数据长度不满足要求,需要继续读,不需要copy_len指向缓冲区的位置),如果重新进入循环,比如continue,那么说明缓冲区中的数据未被消耗完,此时需要更新copy_len。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int copy_len = 0; // 已经处理的字符数
while (bytes_transferred > 0) { // 只要读取到数据就对其处理
if (!_b_head_parse) { // 判断消息头部是否已处理,_b_head_parse默认为false
// 异步读取到的字节数 + 已接收到的头部长度 < 头部总长度
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) { // 收到的数据长度小于头部长度,说明头部还未全部读取
// 如果未完全接收消息头,则将接收到的数据复制到头部缓冲区
// _recv_head_node->_msg,更新当前头部的接收长度,并继续异步读取剩余数据。
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
// 缓冲区清零,无需更新copy_len追踪已处理的字符数,因为之前读取的数据已经全部写入头部节点,下一个
// 读入的消息从头开始(copy_len=0)往头节点写
::memset(_data, 0, MAX_LENGTH);
// 继续读消息
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::headle_read, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

// 如果接收到的数据量足够处理消息头部,则计算头部剩余的未接收字节,
// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node->_msg
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; // 头部剩余未复制的长度
// 填充头部节点
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
copy_len += head_remain; // 更新已处理的data长度
bytes_transferred -= head_remain; // 更新剩余未处理的长度

处理消息体

_b_head_parse不能在上面处理完头结点后直接置为true,因为如果直接为true,那么当读入缓冲区的数据多余消息体的总长度时,仍然跳不出循环,需要继续处理头结点,但此时之前的消息体已经全部填充完毕,需要获取下一个头节点数据并填充消息体,但此时_b_head_parse为true,无法填充头节点。

所以必须当读入_data缓冲区的长度小于头结点要求的消息总长度时,才能置true,此时,重新进入循环时,不用再次读取头结点,而是直接处理消息体剩余未读完的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
short data_len = 0; // 获取头部数据(消息长度)
memcpy(&data_len, _recv_head_node->_msg, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;

if (data_len > MAX_LENGTH) { // 判断头部长度是否非法
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}

_recv_msg_node = std::make_shared<MsgNode>(data_len); // 已知数据长度data_len,构建消息内容载体
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
// copy_len不用更新,缓冲区会清零,下一个读入data的数据从头开始写入,copy_len也会被初始化为0
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));

_b_head_parse = true; //头部处理完成
return;
}

// 接收的长度多于消息内容长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_msg << endl;

Send(_recv_msg_node->_msg, _recv_msg_node->_total_len); // 回传
// 清理已处理的头部消息并重置,准备解析下一条消息
_b_head_parse = false;
_recv_head_node->Clear();

// 如果当前数据已经全部处理完,重置缓冲区 _data,并继续异步读取新的数据
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

continue; // 异步读取的消息未处理完,继续填充头节点乃至新的消息节点
}

处理读入缓冲区填充消息节点后仍剩余的数据

如果读入_data的数据在填充完一个消息头节点和消息体后,仍有剩余,那么此时需要构造下一个新的消息头节点和新的消息体存储这部分剩余的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//已经处理完头部,处理上次未接受完的消息数据
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) { //接收的数据仍不足剩余未处理的
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 接收的数据多于剩余未处理的长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_msg << endl;

//此处可以调用Send发送测试
Send(_recv_msg_node->_msg, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;

完整的接收逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {
// 每触发一次handale_read,它会返回实际读取的字节数bytes_transferred,copy_len表示已处理的长度,每处理一字节,copy_len便加一
int copy_len = 0; // 已经处理的字符数
while (bytes_transferred > 0) { // 只要读取到数据就对其处理
if (!_b_head_parse) { // 判断消息头部是否已处理,_b_head_parse默认为false
// 异步读取到的字节数 + 已接收到的头部长度 < 头部总长度
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) { // 收到的数据长度小于头部长度,说明头部还未全部读取
// 如果未完全接收消息头,则将接收到的数据复制到头部缓冲区
// _recv_head_node->_msg,更新当前头部的接收长度,并继续异步读取剩余数据。
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
// 缓冲区清零,无需更新copy_len追踪已处理的字符数,因为之前读取的数据已经全部写入头部节点,下一个
// 读入的消息从头开始(copy_len=0)往头节点写
::memset(_data, 0, MAX_LENGTH);
// 继续读消息
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::headle_read, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

// 如果接收到的数据量足够处理消息头部,则计算头部剩余的未接收字节,
// 并将其从 _data 缓冲区复制到头部消息缓冲区 _recv_head_node->_msg
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; // 头部剩余未复制的长度
// 填充头部节点
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
copy_len += head_remain; // 更新已处理的data长度
bytes_transferred -= head_remain; // 更新剩余未处理的长度

short data_len = 0; // 获取头部数据(消息长度)
memcpy(&data_len, _recv_head_node->_msg, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;

if (data_len > MAX_LENGTH) { // 判断头部长度是否非法
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}

_recv_msg_node = std::make_shared<MsgNode>(data_len); // 已知数据长度data_len,构建消息内容载体
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
// copy_len不用更新,缓冲区会清零,下一个读入data的数据从头开始写入,copy_len也会被初始化为0
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));

_b_head_parse = true; //头部处理完成
return;
}

// 接收的长度多于消息内容长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_msg << endl;

Send(_recv_msg_node->_msg, _recv_msg_node->_total_len); // 回传
// 清理已处理的头部消息并重置,准备解析下一条消息
_b_head_parse = false;
_recv_head_node->Clear();

// 如果当前数据已经全部处理完,重置缓冲区 _data,并继续异步读取新的数据
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}

continue; // 异步读取的消息未处理完,继续填充头节点乃至新的消息节点
}

//已经处理完头部,处理上次未接受完的消息数据
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) { //接收的数据仍不足剩余未处理的
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
// 接收的数据多于剩余未处理的长度
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_msg << endl;

//此处可以调用Send发送测试
Send(_recv_msg_node->_msg, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}

步骤:

  1. copy_len记录的是已经处理过数据的长度,因为存在一次接收多个包的情况,所以copy_len用来做已经处理的数据长度的。
  2. 首先判断_b_head_parse是否为false,如果为false则说明头部未处理,先判断接收的数据是否小于头部, 如果小于头部大小则将接收到的数据放入_recv_head_node节点保存,然后继续调用读取函数监听对端发送数据。否则进入步骤3.
  3. 如果收到的数据比头部多,可能是多个逻辑包,所以要做切包处理。根据之前保留在_recv_head_node的长度,计算出剩余未取出的头部长度,然后取出剩余的头部长度保存在_recv_head_node节点,然后通过memcpy方式从节点拷贝出数据写入short类型的data_len里,进而获取消息的长度。接下来继续处理包体,也就是消息体,判断接收到的数据未处理部分的长度和总共要接收的数据长度大小,如果小于总共要接受的长度,说明消息体没接收完,则将未处理部分先写入_recv_msg_node里,并且继续监听读事件。否则说明消息体接收完全,进入步骤4
  4. 将消息体数据接收到_recv_msg_node中,接受完全后返回给对端。当然存在多个逻辑包粘连,此时要判断bytes_transferred是否小于等于0,如果是说明只有一个逻辑包,我们处理完了,继续监听读事件,就直接返回即可。否则说明有多个数据包粘连,就继续执行上述操作。
  5. 因为存在_b_head_parse为true,也就是包头接收并处理完的情况,但是包体未接受完,再次触发HandleRead,此时要继续处理上次未接受完的消息体,大体逻辑和3,4一样。

img

切包流程

4)修改客户端

客户端先发送两个字节的数据长度,再发送消息体;同理,接收时,首先接收两个字节的消息头获得数据长度,再根据长度创建消息载体,读取消息内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio::ip;
using std::cout;
using std::endl;
const int MAX_LENGTH = 1024 * 2; // 发送和接收的长度为1024 * 2字节
const int HEAD_LENGTH = 2;

int main()
{
try {
boost::asio::io_context ioc; // 创建上下文服务
// 127.0.0.1是本机的回路地址,也就是服务器和客户端在一个机器上
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086); // 构造endpoint
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found; // 错误:主机未找到
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

cout << "Enter message: "; // 连接成功,请输入发送的信息
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH); // 提取发送的数据
size_t request_length = strlen(request); // 发送的数据长度
char send_data[MAX_LENGTH] = { 0 };
memcpy(send_data, &request_length, 2); // 将消息总长度放入存储区前两个字节
memcpy(send_data + 2, request, request_length); // 将消息体放入存储区首地址两字节偏移之后
// 一次性发送数据,数据长度为消息总长度(2字节)+消息体(发送内容)
boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));

char reply_head[HEAD_LENGTH]; // 首先读取对端发送消息的总长度
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
short msglen = 0; // 消息总长度
memcpy(&msglen, reply_head, HEAD_LENGTH); // 将消息总长度赋值给msglen
char msg[MAX_LENGTH] = { 0 }; // 构建消息体(不含消息总长度)
size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

std::cout << "Reply is: ";
std::cout.write(msg, msglen) << endl;
std::cout << "Reply len is " << msglen;
std::cout << "\n";
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
return 0;
}

5)粘包测试

客户端修改

这里需要修改客户端,使其一直处于收和发的状态。因为在客户端收发的同步、阻塞的,如果读操作一直没收到那就会一直挂起,而发操作一直在发,会抢占cpu资源,而读操作在挂起后又有新的读申请,但是cpu分配不出来资源给它,可能会造成客户端一直发但是读不到的情况,这里每个收发线程每进行一次操作就休眠2ms。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024 * 2;
const int HEAD_LENGTH = 2;
int main()
{
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造endpoint
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found; ;
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

// 发送线程,不断发送hello world
thread send_thread([&sock] {
for (;;) {
this_thread::sleep_for(std::chrono::milliseconds(2));
const char* request = "hello world!";
size_t request_length = strlen(request);
char send_data[MAX_LENGTH] = { 0 };
memcpy(send_data, &request_length, 2);
memcpy(send_data + 2, request, request_length);
boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
}
});

// 接收线程,不断接收
thread recv_thread([&sock] {
for (;;) {
this_thread::sleep_for(std::chrono::milliseconds(2));
cout << "begin to receive..." << endl;
char reply_head[HEAD_LENGTH];
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
short msglen = 0;
memcpy(&msglen, reply_head, HEAD_LENGTH);
char msg[MAX_LENGTH] = { 0 };
size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

std::cout << "Reply is: ";
std::cout.write(msg, msglen) << endl;
std::cout << "Reply len is " << msglen;
std::cout << "\n";
}
});

send_thread.join();
recv_thread.join();
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
return 0;
}

服务求修改

为了测试粘包,需要制造粘包产生的现象,可以让客户端发送的频率高一些,服务器接收的频率低一些,这样造成前后端收发数据不一致导致多个数据包在服务器tcp缓冲区滞留产生粘包现象。

测试粘包之前,在服务器的CSession类里添加打印二进制数据的函数,便于查看缓冲区的数据

1
2
3
4
5
6
7
8
9
10
11
void CSession::PrintRecvData(char* data, int length) {
stringstream ss;
string result = "0x";
for (int i = 0; i < length; i++) {
string hexstr;
ss << hex << std::setw(2) << std::setfill('0') << int(data[i]) << endl;
ss >> hexstr;
result += hexstr;
}
std::cout << "receive raw data is : " << result << endl;;
}

加入回调读函数

1
2
3
4
5
6
7
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,
std::shared_ptr<CSession> _self_shared) {
if (!error) {

PrintRecvData(_data, bytes_transferred);
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);

测试结果

img

服务器收发结果

img

文客户端收发结果

可以看到服务器每隔两秒接收一次数据,数据中有多个数据包,服务器首先将缓存区_data中收到的数据打印为十六进制格式,可看出数据均粘连在一起,在经过切包处理后,数据被成功显示。


1. ::memset和memset的区别?

  • memset 是标准库函数,用于将内存块设置为指定的字节值,通常位于 cstring(C++)或者 string.h(C)头文件中。在代码中直接使用 memset,编译器会查找当前作用域和全局命名空间中的定义。
  • ::memset 是带有全局命名空间解析符的调用,明确告诉编译器调用全局命名空间中的 memset 函数。这种形式常用于防止命名冲突。比如,如果某个类或命名空间中有一个与 memset 同名的函数或变量,使用 ::memset 可以确保调用标准库中的 memset。

2. copy_len的作用, copy_len的更新规则?

偏移量 copy_len 用于跟踪当前已经处理了多少数据,这样即使数据分多次接收,程序也能从正确的地方继续处理未处理的数据。

例子:

  • 假设期望接收 100 字节的数据;
  • 第一次接收到 40 字节,程序处理完这 40 字节,但消息还没有接收完整;
  • 需要继续等待下一次接收更多的数据来补全剩余的部分;
  • 当第二次接收到 60 字节时,程序从第 41 字节开始拼接到之前的数据中,这就是为什么要用偏移量来确保数据是按正确的顺序拼接的。

copy_len 是用来追踪已经写入缓冲区_data字符的处理情况,如果读入_data的字符长度bytes_transferred小于头结点的长度或者大于头结点的长度但是小于头结点中包含的消息总长度,那么这种情况下,_data缓冲区的数据都会被写入头结点或者头结点和消息结点(当填满头结点后,copy_len 需要更新,追踪缓冲区剩余的数据位置,如果剩余的长度小于头结点带的消息总长度数据,那么下一步填充消息结点时无需更新copy_len )。此时,无需更新copy_len ,因为消息未读全的情况下,会继续执行异步读操作,在新的异步读中,copy_len 会被初始化为0,而剩余未被读入的数据会被逐步读入并写入缓冲区,缓冲区的数据从头开始写入对应的头结点或者消息结点,无需copy_len 追踪剩余情况。

如果写入缓冲区的长度bytes_transferred大于头结点,那么需要更新copy_len ,此时_data的数据会被指向头结点(2字节)之后的数据位置,并将从此开始的消息传入消息结点_recv_msg_node

如果读入缓冲区的数据长度大于头节点携带的消息总长度,那么更新copy_len,重新进入while循环。此时,会将处理完上一个消息头和消息体之后剩余的数据存入新的消息头和新的消息体,此时,需要copy_len指定再_data缓冲区中剩余数据的位置。

总结,仅有当收到的消息(存入缓冲区_data)的长度小于头结点长度(2字节)时,或者收到的消息长度大于头结点,此时填充头结点并更新copy_len,让copy_len指向2字节之后的内容,如果剩余的长度小于头结点要求的消息总长度,那么不更新copy_len,清空_data,继续执行异步读。只有执行异步读时不需要更新copy_len(说明读的数据长度不满足要求,需要继续读,不需要copy_len指向缓冲区的位置),如果重新进入循环,比如continue,那么说明缓冲区中的数据未被消耗完,此时需要更新copy_len。