七、day7 今天学习如何处理服务器粘包 问题。
参考:
恋恋风辰官方博客
visual studio配置C++ boost库_哔哩哔哩_bilibili
粘包问题 当客户端连续发送多个数据包时,服务器底层tcp接受缓冲区收到的数据是连粘在一起的,比如
连粘数据
粘包原因 1)socket收发原理 发送数据并不是直接发送给对端, 而是应用程序将数据发送到本机操作系统的缓存里边, 当数据量小, 发送的时间间隔短, 操作系统就会在缓存区先攒够一个TCP段 再通过网卡一起发送, 接收数据也是一样的, 先在操作系统的缓存存着, 然后应用程序再从操作系统中取出数据。
socket收发原理
2)粘包原因
客户端的发送频率远高于服务器的接收频率,接收方没有及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)recv会产生黏包(如果recv接受的数据量(1024)小于发送的数据量,第一次只能接收规定的数据量1024,第二次接收剩余的数据量)
发送端需要等缓冲区满才发送出去,造成粘包(tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,发送数据时间间隔很短,数据也很小,会合到一起,产生粘包)send 也可能发生粘包现象。(连续send少量的数据发到输出缓冲区 ,由于缓冲区的机制,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络
3)如何处理粘包 处理粘包的方式主要采用应用层定义收发包格式 的方式,这个过程俗称切包处理,常用的协议被称为tlv协议 (消息id+消息长度+消息内容),如下图
本节对其做了简化,消息格式只保留消息长度和消息内容,后续加消息id。
1)完善消息节点 新增两个变量表示数据最大长度和数据包头部的大小,消息长度用两个字节存储,最大存储长度为1024*2字节
1 2 const size_t MAX_LENGTH = 1024 * 2 ;const size_t HEAD_LENGTH = 2 ;
两个参数的构造函数做了完善,之前的构造函数通过消息首地址和长度构造节点数据,现在需要在构造节点的同时把长度信息也写入节点,该构造函数主要用来发送数据时构造发送信息的节点。
一个参数的构造函数为较上次新增的,主要根据消息的长度构造消息节点,该构造函数主要是接收对端数据时构造接收节点调用的。
新增一个Clear函数清除消息节点的数据,主要是避免多次构造节点造成开销。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class MsgNode {public : int _total_len; int _cur_len; char * _msg; MsgNode (const char * msg, int total_len) :_total_len(total_len + HEAD_LENGTH), _cur_len(0 ) { _msg = new char [total_len + 1 ]; memcpy (_msg, &total_len, HEAD_LENGTH); memcpy (_msg + HEAD_LENGTH, msg, total_len); _msg[_total_len] = '\0' ; } MsgNode (int total_len) : _total_len(total_len), _cur_len(0 ) { _msg = new char [total_len + 1 ]; } ~MsgNode () { delete [] _msg; } void Clear () { ::memset (_msg, 0 , _total_len); _cur_len = 0 ; } };
2)CSession类完善 为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构。
1 2 3 std::shared_ptr<MsgNode> _recv_msg_node; bool _b_head_parse; std::shared_ptr<MsgNode> _recv_head_node;
3)完善接收逻辑 处理头部节点
copy_len 是用来追踪已经写入缓冲区_data字符的处理情况,如果读入_data的字符长度bytes_transferred小于头结点 的长度或者大于头结点的长度但是小于头结点中包含的消息总长度,那么这种情况下,_data缓冲区的数据都会被写入头结点或者头结点和消息结点(当填满头结点后,copy_len 需要更新,追踪缓冲区剩余的数据位置,如果剩余的长度小于头结点带的消息总长度数据,那么下一步填充消息结点时无需更新copy_len )。此时,无需更新copy_len ,因为消息未读全的情况下,会继续执行异步读操作,在新的异步读中,copy_len 会被初始化为0,而剩余未被读入的数据会被逐步读入并写入缓冲区,缓冲区的数据从头开始写入对应的头结点或者消息结点,无需copy_len 追踪剩余情况。
如果写入缓冲区的长度bytes_transferred大于头结点,那么需要更新copy_len
,此时_data
的数据会被指向排除头结点(2字节)后的数据位置,并将从此开始的消息传入消息结点_recv_msg_node
。
如果读入缓冲区的数据长度大于头节点携带的消息总长度,那么更新copy_len,重新进入while循环。此时,会将处理完上一个消息头和消息体之后剩余的数据存入新的消息头和新的消息体,此时,需要copy_len指定再_data缓冲区中剩余数据的位置。
总结,仅有当收到的消息(存入缓冲区_data
)的长度小于头结点长度(2字节)时,或者收到的消息长度大于头结点,此时填充头结点并更新copy_len,让copy_len指向2字节之后的内容,如果剩余的长度小于头结点要求的消息总长度,那么不更新copy_len,清空_data
,继续执行异步读。只有执行异步读时不需要更新copy_len(说明读的数据长度不满足要求,需要继续读,不需要copy_len指向缓冲区的位置),如果重新进入循环,比如continue,那么说明缓冲区中的数据未被消耗完,此时需要更新copy_len。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 int copy_len = 0 ; while (bytes_transferred > 0 ) { if (!_b_head_parse) { if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) { memcpy (_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); _recv_head_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; memcpy (_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);copy_len += head_remain; bytes_transferred -= head_remain;
处理消息体
_b_head_parse
不能在上面处理完头结点后直接置为true,因为如果直接为true,那么当读入缓冲区的数据多余消息体的总长度时,仍然跳不出循环,需要继续处理头结点,但此时之前的消息体已经全部填充完毕,需要获取下一个头节点数据并填充消息体,但此时_b_head_parse
为true,无法填充头节点。
所以必须当读入_data缓冲区的长度小于头结点要求的消息总长度时,才能置true,此时,重新进入循环时,不用再次读取头结点,而是直接处理消息体剩余未读完的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 short data_len = 0 ; memcpy (&data_len, _recv_head_node->_msg, HEAD_LENGTH);cout << "data_len is " << data_len << endl; if (data_len > MAX_LENGTH) { std::cout << "invalid data length is " << data_len << endl; _server->ClearSession (_uuid); return ; } _recv_msg_node = std::make_shared <MsgNode>(data_len); if (bytes_transferred < data_len) { memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); _b_head_parse = true ; return ; } memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);_recv_msg_node->_cur_len += data_len; copy_len += data_len; bytes_transferred -= data_len; _recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_msg << endl; Send (_recv_msg_node->_msg, _recv_msg_node->_total_len); _b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } continue ; }
处理读入缓冲区填充消息节点后仍剩余的数据
如果读入_data的数据在填充完一个消息头节点和消息体后,仍有剩余,那么此时需要构造下一个新的消息头节点和新的消息体存储这部分剩余的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) { memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg; bytes_transferred -= remain_msg; copy_len += remain_msg; _recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_msg << endl; Send (_recv_msg_node->_msg, _recv_msg_node->_total_len);_b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } continue ;
完整的接收逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 void CSession::headle_read (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) {if (!error) {int copy_len = 0 ; while (bytes_transferred > 0 ) { if (!_b_head_parse) { if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) { memcpy (_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); _recv_head_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; memcpy (_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);copy_len += head_remain; bytes_transferred -= head_remain; short data_len = 0 ; memcpy (&data_len, _recv_head_node->_msg, HEAD_LENGTH);cout << "data_len is " << data_len << endl; if (data_len > MAX_LENGTH) { std::cout << "invalid data length is " << data_len << endl; _server->ClearSession (_uuid); return ; } _recv_msg_node = std::make_shared <MsgNode>(data_len); if (bytes_transferred < data_len) { memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); _b_head_parse = true ; return ; } memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);_recv_msg_node->_cur_len += data_len; copy_len += data_len; bytes_transferred -= data_len; _recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_msg << endl; Send (_recv_msg_node->_msg, _recv_msg_node->_total_len); _b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) { ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ; } continue ; } int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) { memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred; ::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ;} memcpy (_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg; bytes_transferred -= remain_msg; copy_len += remain_msg; _recv_msg_node->_msg[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_msg << endl; Send (_recv_msg_node->_msg, _recv_msg_node->_total_len);_b_head_parse = false ; _recv_head_node->Clear (); if (bytes_transferred <= 0 ) {::memset (_data, 0 , MAX_LENGTH); _socket.async_read_some (boost::asio::buffer (_data, MAX_LENGTH), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); return ;} continue ;} } else {std::cout << "handle read failed, error is " << error.what () << endl; Close ();_server->ClearSession (_uuid); } }
步骤:
copy_len记录的是已经处理过数据的长度,因为存在一次接收多个包的情况,所以copy_len用来做已经处理的数据长度的。
首先判断_b_head_parse
是否为false,如果为false则说明头部未处理,先判断接收的数据是否小于头部, 如果小于头部大小则将接收到的数据放入_recv_head_node
节点保存,然后继续调用读取函数监听对端发送数据。否则进入步骤3.
如果收到的数据比头部多,可能是多个逻辑包,所以要做切包处理。根据之前保留在_recv_head_node
的长度,计算出剩余未取出的头部长度,然后取出剩余的头部长度保存在_recv_head_node
节点,然后通过memcpy方式从节点拷贝出数据写入short类型的data_len里,进而获取消息的长度。接下来继续处理包体,也就是消息体,判断接收到的数据未处理部分的长度和总共要接收的数据长度大小,如果小于总共要接受的长度,说明消息体没接收完,则将未处理部分先写入_recv_msg_node
里,并且继续监听读事件。否则说明消息体接收完全,进入步骤4
将消息体数据接收到_recv_msg_node
中,接受完全后返回给对端。当然存在多个逻辑包粘连,此时要判断bytes_transferred是否小于等于0,如果是说明只有一个逻辑包,我们处理完了,继续监听读事件,就直接返回即可。否则说明有多个数据包粘连,就继续执行上述操作。
因为存在_b_head_parse
为true,也就是包头接收并处理完的情况,但是包体未接受完,再次触发HandleRead,此时要继续处理上次未接受完的消息体,大体逻辑和3,4一样。
切包流程
4)修改客户端 客户端先发送两个字节的数据长度,再发送消息体;同理,接收时,首先接收两个字节的消息头获得数据长度,再根据长度创建消息载体,读取消息内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <boost/asio.hpp> #include <iostream> using namespace boost::asio::ip;using std::cout;using std::endl;const int MAX_LENGTH = 1024 * 2 ; const int HEAD_LENGTH = 2 ;int main () { try { boost::asio::io_context ioc; tcp::endpoint remote_ep (address::from_string("127.0.0.1" ), 10086 ) ; tcp::socket sock (ioc) ; boost::system::error_code error = boost::asio::error::host_not_found; sock.connect (remote_ep, error); if (error) { cout << "connect failed, code is " << error.value () << " error msg is " << error.message (); return 0 ; } cout << "Enter message: " ; char request[MAX_LENGTH]; std::cin.getline (request, MAX_LENGTH); size_t request_length = strlen (request); char send_data[MAX_LENGTH] = { 0 }; memcpy (send_data, &request_length, 2 ); memcpy (send_data + 2 , request, request_length); boost::asio::write (sock, boost::asio::buffer (send_data, request_length + 2 )); char reply_head[HEAD_LENGTH]; size_t reply_length = boost::asio::read (sock, boost::asio::buffer (reply_head, HEAD_LENGTH)); short msglen = 0 ; memcpy (&msglen, reply_head, HEAD_LENGTH); char msg[MAX_LENGTH] = { 0 }; size_t msg_length = boost::asio::read (sock, boost::asio::buffer (msg, msglen)); std::cout << "Reply is: " ; std::cout.write (msg, msglen) << endl; std::cout << "Reply len is " << msglen; std::cout << "\n" ; } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << endl; } return 0 ; }
5)粘包测试 客户端修改
这里需要修改客户端,使其一直处于收和发的状态。因为在客户端收发的同步、阻塞的,如果读操作一直没收到那就会一直挂起,而发操作一直在发,会抢占cpu资源,而读操作在挂起后又有新的读申请,但是cpu分配不出来资源给它,可能会造成客户端一直发但是读不到的情况,这里每个收发线程每进行一次操作就休眠2ms。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include <iostream> #include <boost/asio.hpp> #include <thread> using namespace std;using namespace boost::asio::ip;const int MAX_LENGTH = 1024 * 2 ;const int HEAD_LENGTH = 2 ;int main () { try { boost::asio::io_context ioc; tcp::endpoint remote_ep (address::from_string("127.0.0.1" ), 10086 ) ; tcp::socket sock (ioc) ; boost::system::error_code error = boost::asio::error::host_not_found; ; sock.connect (remote_ep, error); if (error) { cout << "connect failed, code is " << error.value () << " error msg is " << error.message (); return 0 ; } thread send_thread ([&sock] { for (;;) { this_thread::sleep_for(std::chrono::milliseconds(2 )); const char * request = "hello world!" ; size_t request_length = strlen(request); char send_data[MAX_LENGTH] = { 0 }; memcpy(send_data, &request_length, 2 ); memcpy(send_data + 2 , request, request_length); boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2 )); } }) ; thread recv_thread ([&sock] { for (;;) { this_thread::sleep_for(std::chrono::milliseconds(2 )); cout << "begin to receive..." << endl; char reply_head[HEAD_LENGTH]; size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH)); short msglen = 0 ; memcpy(&msglen, reply_head, HEAD_LENGTH); char msg[MAX_LENGTH] = { 0 }; size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen)); std::cout << "Reply is: " ; std::cout.write(msg, msglen) << endl; std::cout << "Reply len is " << msglen; std::cout << "\n" ; } }) ; send_thread.join (); recv_thread.join (); } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << endl; } return 0 ; }
服务求修改
为了测试粘包,需要制造粘包产生的现象,可以让客户端发送的频率高一些,服务器接收的频率低一些,这样造成前后端收发数据不一致导致多个数据包在服务器tcp缓冲区滞留产生粘包现象。
测试粘包之前,在服务器的CSession类里添加打印二进制数据的函数,便于查看缓冲区的数据
1 2 3 4 5 6 7 8 9 10 11 void CSession::PrintRecvData (char * data, int length) { stringstream ss; string result = "0x" ; for (int i = 0 ; i < length; i++) { string hexstr; ss << hex << std::setw (2 ) << std::setfill ('0' ) << int (data[i]) << endl; ss >> hexstr; result += hexstr; } std::cout << "receive raw data is : " << result << endl;; }
加入回调读函数
1 2 3 4 5 6 7 void CSession::headle_read (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) { if (!error) { PrintRecvData (_data, bytes_transferred); std::chrono::milliseconds dura (2000 ) ; std::this_thread::sleep_for (dura);
测试结果
服务器收发结果
文客户端收发结果
可以看到服务器每隔两秒接收一次数据,数据中有多个数据包,服务器首先将缓存区_data中收到的数据打印为十六进制 格式,可看出数据均粘连在一起,在经过切包处理后,数据被成功显示。
1. ::memset和memset的区别?
memset 是标准库函数,用于将内存块设置为指定的字节值,通常位于 cstring(C++)或者 string.h(C)头文件 中。在代码中直接使用 memset,编译器会查找当前作用域和全局命名空间 中的定义。
::memset 是带有全局命名空间解析符的调用,明确告诉编译器调用全局命名空间中的 memset 函数。这种形式常用于防止命名冲突。比如,如果某个类或命名空间中有一个与 memset 同名的函数或变量,使用 ::memset 可以确保调用标准库中的 memset。
2. copy_len的作用, copy_len的更新规则?
偏移量 copy_len 用于跟踪当前已经处理了多少数据 ,这样即使数据分多次接收,程序也能从正确的地方继续处理未处理的数据。
例子:
假设期望接收 100 字节的数据;
第一次接收到 40 字节,程序处理完这 40 字节,但消息还没有接收完整;
需要继续等待下一次接收更多的数据来补全剩余的部分;
当第二次接收到 60 字节时,程序从第 41 字节开始拼接到之前的数据中,这就是为什么要用偏移量来确保数据是按正确的顺序拼接的。
copy_len 是用来追踪已经写入缓冲区_data字符的处理情况,如果读入_data的字符长度bytes_transferred小于头结点的长度或者大于头结点的长度但是小于头结点中包含的消息总长度,那么这种情况下,_data缓冲区的数据都会被写入头结点或者头结点和消息结点(当填满头结点后,copy_len 需要更新,追踪缓冲区剩余的数据位置,如果剩余的长度小于头结点带的消息总长度数据,那么下一步填充消息结点时无需更新copy_len )。此时,无需更新copy_len ,因为消息未读全的情况下,会继续执行异步读操作,在新的异步读中,copy_len 会被初始化为0,而剩余未被读入的数据会被逐步读入并写入缓冲区,缓冲区的数据从头开始写入对应的头结点或者消息结点,无需copy_len 追踪剩余情况。
如果写入缓冲区的长度bytes_transferred大于头结点,那么需要更新copy_len ,此时_data
的数据会被指向头结点(2字节)之后的数据位置,并将从此开始的消息传入消息结点_recv_msg_node
。
如果读入缓冲区的数据长度大于头节点携带的消息总长度,那么更新copy_len,重新进入while循环。此时,会将处理完上一个消息头和消息体之后剩余的数据存入新的消息头和新的消息体,此时,需要copy_len指定再_data缓冲区中剩余数据的位置。
总结,仅有当收到的消息(存入缓冲区_data
)的长度小于头结点长度(2字节)时,或者收到的消息长度大于头结点,此时填充头结点并更新copy_len,让copy_len指向2字节之后的内容,如果剩余的长度小于头结点要求的消息总长度,那么不更新copy_len,清空_data
,继续执行异步读。只有执行异步读时不需要更新copy_len(说明读的数据长度不满足要求,需要继续读,不需要copy_len指向缓冲区的位置),如果重新进入循环,比如continue,那么说明缓冲区中的数据未被消耗完,此时需要更新copy_len。