六、day6 在上午学习完如何通过c++11特性模拟伪闭包实现连接的安全回收之后,下午学习如何封装一个发送接口,该接口能保证发送的时序性 (异步发送 时TCP底层缓冲区可能无法将所有数据一次发出去,如果这时候再次调用异步发送,就可能造成数据错乱 ,比如发送两个Hello World,如果是同步,那么数据是有序的,但如果是异步,那么可能会导致在TCp存储的数据是Hello Hello World World,这时候需要我们构造一个消息队列,将需要发送的数据插入队列,下一个消息按序排列到上一个消息后面,只有当上一个消息发送完毕,下一个消息才会从消息队列中pop出发送)。实现的关键 在于:多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的(队列)。
文章开始前将前面文章中提到的Server和Session类分成CServer.h和CSession.h两个文件,注意两个文件的依赖关系:
CSession.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #pragma once #include <iostream> #include <boost/asio.hpp> #include <map> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> using boost::asio::ip::tcp;using std::cout;using std::cin;using std::endl;class CServer ;class MsgNode {public : int _total_len; int _cur_len; char * _msg; MsgNode (const char * msg, int total_len) :_total_len(total_len), _cur_len(0 ) { _msg = new char [total_len]; memcpy (_msg, msg, total_len); } MsgNode (int total_len) : _total_len(total_len), _cur_len(0 ) { _msg = new char [total_len]; } ~MsgNode () { delete [] _msg; } }; class CSession :public std::enable_shared_from_this<CSession>{ private : tcp::socket _socket; enum { max_length = 1024 }; char _data[max_length]; void headle_read (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) ; void haddle_write (const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) ; std::string _uuid; CServer* _server; public : CSession (boost::asio::io_context& ioc, CServer* server) : _socket(ioc), _server(server){ boost::uuids::uuid a_uuid = boost::uuids::random_generator ()(); _uuid = boost::uuids::to_string (a_uuid); } tcp::socket& Socket () { return _socket; } const std::string& GetUuid () const { return _uuid; } void Start () ; };
CSession.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include "CSession.h" #include "CServer.h" void CSession::Start () { memset (_data, 0 , max_length); _socket.async_read_some (boost::asio::buffer (_data, max_length), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, shared_from_this ())); } void CSession::headle_read (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) { if (!error) { cout << "server receive data is " << _data << endl; boost::asio::async_write (_socket, boost::asio::buffer (_data, bytes_transferred), std::bind (&CSession::haddle_write, this , std::placeholders::_1, _self_shared)); } else { cout << "read error" << endl; _server->ClearSession (_uuid); } } void CSession::haddle_write (const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) { if (!error) { memset (_data, 0 , max_length); _socket.async_read_some (boost::asio::buffer (_data, max_length), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); } else { cout << "write error" << error.value () << endl; _server->ClearSession (_uuid); } }
CServer.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #pragma once #include "CSession.h" class CServer { private : void start_accept () ; void handle_accept (std::shared_ptr<CSession> new_session, const boost::system::error_code& error) ; boost::asio::io_context& _ioc; tcp::acceptor _acceptor; std::map<std::string, std::shared_ptr<CSession>> _sessions; public : CServer (boost::asio::io_context& ioc, short port); void ClearSession (std::string uuid) ; };
CServer.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include "CServer.h" CServer::CServer (boost::asio::io_context& ioc, short port) : _ioc(ioc), _acceptor(ioc, tcp::endpoint (tcp::v4 (), port)) { cout << "Server start success, on port: " << port << endl; start_accept (); } void CServer::ClearSession (std::string uuid) { _sessions.erase (uuid); } void CServer::start_accept () { std::shared_ptr<CSession> new_session = std::make_shared <CSession>(_ioc, this ); _acceptor.async_accept (new_session->Socket (), std::bind (&CServer::handle_accept, this , new_session, std::placeholders::_1)); } void CServer::handle_accept (std::shared_ptr<CSession> new_session, const boost::system::error_code& error) { if (!error) { new_session->Start (); _sessions.insert (std::make_pair (new_session->GetUuid (), new_session)); } else cout << "session accept failed, error is " << error.what () << endl; start_accept (); }
1)数据节点设计 首先,使用网络编程(3) 中的数据节点,作为异步服务器数据的存储节点,放在CSession.h文件中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MsgNode {public : int _total_len; int _cur_len; char * _msg; MsgNode (const char * msg, int total_len) :_total_len(total_len), _cur_len(0 ) { _msg = new char [total_len]; memcpy (_msg, msg, total_len); } MsgNode (int total_len) : _total_len(total_len), _cur_len(0 ) { _msg = new char [total_len]; } ~MsgNode () { delete [] _msg; } };
2)封装发送接口 服务器的发送接口一般是在逻辑线程调用,所以调用发送线程的接口和asio回调的网络线程 不在一个线程,这个发送队列就存在两个线程的共同访问,所以需增加一个锁 保证发送队列 的安全性,同时新增一个发送接口Send 。
1 2 3 void Send (char * msg, int max_length) ;std::queue<shared_ptr<MsgNode> > _send_que; std::mutex _send_lock;
以及send的实现:
发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。
这个函数确保了在多线程环境下,发送操作的有序性与安全性。通过锁来保护发送队列,通过队列来缓存多个待发送的消息,并使用异步写操作 async_write 进行非阻塞的发送。函数还确保了只有一个异步写操作会在某一时刻进行,避免同时多次发送操作对同一套接字的竞争访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void CSession::Send (char * msg, int max_length) { bool pending = false ; std::lock_guard<std::mutex> lock (_send_lock) ; if (_send_que.size () > 0 ) { pending = true ; } _send_que.push (std::make_shared <MsgNode>(msg, max_length)); if (pending) { return ; } boost::asio::async_write (_socket, boost::asio::buffer (msg, max_length), std::bind (&CSession::haddle_write, this , std::placeholders::_1, shared_from_this ())); }
3)修改读和写回调 写回调 (实现了异步写操作完成后的处理逻辑,在写入操作成功时从发送队列中移除已发送的数据,并继续处理队列中的下一个数据包;如果写入操作失败,则处理错误并清除会话):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void CSession::haddle_write (const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) { if (!error) { std::lock_guard<std::mutex> lock (_send_lock) ; _send_que.pop (); if (!_send_que.empty ()) { auto & msgnode = _send_que.front (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_msg, msgnode->_total_len), std::bind (&CSession::haddle_write, this , std::placeholders::_1, _self_shared)); } } else { std::cout << "handle write failed, error is " << error.what () << endl; _server->ClearSession (_uuid); } }
读回调:
因为服务器一般是全双工通信 ,所以要一直监听对端发送的数据,在每次收到数据后继续绑定监听事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void CSession::headle_read (const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) { if (!error) { cout << "server receive data is " << _data << endl; Send (_data, bytes_transferred); memset (_data, 0 , max_length); _socket.async_read_some (boost::asio::buffer (_data, max_length), std::bind (&CSession::headle_read, this , std::placeholders::_1, std::placeholders::_2, _self_shared)); } else { std::cout << "handle read failed, error is " << error.what () << endl; _server->ClearSession (_uuid); } }
1.服务器的发送接口一般是在逻辑线程调用,调用发送线程的接口和asio回调的网络线程不在一个线程,发送队列存在两个线程的共同访问,如何解释这句话?
1)逻辑线程
服务器程序可能有多个线程执行不同的任务,其中有一个或多个线程专门负责业务逻辑(通常称为逻辑线程 )。这些线程负责处理如游戏逻辑、业务处理等高层次的操作。
发送数据的请求通常由这些逻辑线程发起,也就是说,逻辑线程会调用服务器的发送接口来准备或触发向客户端发送数据。
2)网络线程
使用Boost.Asio这样的异步I/O库时,实际处理网络通信 的部分是由网络线程 (通常是由Boost.Asio提供的线程)来负责。这些线程处理所有的网络事件 和I/O操作,比如读写操作完成时的回调函数。
当逻辑线程调用发送接口时,实际的数据发送操作是交由网络线程处理的,因此存在逻辑线程 和网络线程 之间的协作。这两个线程不是同一个线程,存在并发访问的问题。
3)发送队列
为了实现异步发送,服务器通常会有一个发送队列 ,用于暂存即将发送的数据包。逻辑线程将数据放入这个队列中,而网络线程则从队列中取出数据并通过网络发送出去。
因为这个发送队列是由两个不同的线程(逻辑线程和网络线程)共同访问的,因此会有并发问题。如果没有进行适当的同步控制(如加锁或使用无锁队列),可能会导致数据竞争(data race)、不一致或崩溃等问题。
2.std::lock_guard<std::mutex> lock(_send_lock)
是如何保护发送队列的,什么时候解除保护?
std::lock_guard<std::mutex>
是一个类模板,它会在创建时锁住传递的互斥量(mutex) ,并在离开作用域时自动解锁 。锁的保护通过以下机制实现:
1 std::lock_guard<std::mutex> lock (_send_lock) ;
这行代码创建了一个 std::lock_guard 对象 lock,并将 _send_lock 传递给它,表示要锁定 _send_lock 互斥量。
一旦 lock 对象被创建,构造函数会立即锁定 _send_lock,从而确保在该作用域内,其他线程无法同时访问受该锁保护的资源。
锁定后,直到当前代码块结束前,其他线程无法获取 _send_lock,从而保证了临界区(即锁定代码之后的代码块) 的线程安全。
std::lock_guard<std::mutex
> 的锁定持续到该对象的生命周期结束 。当 lock 对象超出其作用域时(即代码块结束时),它会自动调用其析构函数 ,从而释放互斥锁 _send_lock
。
3.锁的机制
锁是用于协调对共享资源 (比如发送队列)的访问,确保在多线程环境中只有一个线程能够在某一时刻访问该资源,当一个线程在访问发送队列时:
1 2 std::lock_guard<std::mutex> lock (_send_lock) ;_send_que.push (...);
通过 lock_guard 锁定 _send_lock,只有当前线程能进入这段代码,并操作 _send_que 。
如果其他线程也想访问队列,它们会在获取 _send_lock 时被阻塞,直到当前线程释放锁。这就防止了多个线程同时修改队列的可能性。
举例:
1 2 3 4 5 void CSession::Send (char * msg, int max_length) { std::lock_guard<std::mutex> lock (_send_lock) ; _send_que.push (std::make_shared <MsgNode>(msg, max_length)); }
当线程A调用 Send() 函数并进入这段代码时,它加锁 _send_lock,防止其他线程B同时修改 _send_que。
线程B调用 Send() 函数时,会发现 _send_lock 被线程A持有,线程B必须等待线程A释放锁后,才能获得锁并访问队列。
参考:
恋恋风辰官方博客
visual studio配置C++ boost库_哔哩哔哩_bilibili