十九、day19 上一节学习了如果通过asio协程实现一个简单的并发服务器demo(官方案例),今天学习如何通过asio协程搭建一个比较完整的并发服务器。
主要实现了AsioIOServicePool线程池、逻辑层LogicSystem、粘包处理、接收协程、发送队列、网络序列化、json序列化、通过两种方式实现单例模式(第一种方式实现线程池AsioIOServicePool,第二种方式实现逻辑层LogicSystem)。
Const文件主要用于定义各种宏
CServer主要用于监听客户端的连接,并生成并分配Session会话
AsioIOServicePool线程池用于多线程收发
LogicSystem逻辑层用于处理逻辑队列中的数据
CSession用于服务器与客户端的收发操作
视频资料参考up恋恋风辰:
C++ 网络编程(19) 利用协程实现并发服务器(上)_哔哩哔哩_bilibili
在此之前,回顾一下协程的概念:
协程的操作过程不同于普通函数,普通函数的返回点仅有返回的功能,而协程的返回点不仅可以返回还可以暂停。
举例:
1 2 3 4 5 6 def func () : print("a" ) return print("b" ) return print("c" )
普通函数在遇到return
后会直接返回,而不会继续执行return
下面的代码,这意味着b和c永远不会被打印。而协程在返回后可以继续调用该协程,而且是从上一个返回点后继续执行,简单来说就是带存档功能的函数。
在python中,暂停和返回通过yield
实现,改造上面代码如下
1 2 3 4 5 6 def func(): print("a") yield print("b") yield print("c")
定义调用函数foo:
1 2 3 4 5 def foo(): co = func() next(co) print("in foo") next(co)
输出:
而普通函数的输出为
如下图:
普通函数在被调用时会从头开始往后运行,直至遇到返回点,第二次运行时仍然是从头开始,普通函数的返回点没有暂停功能。
而协程在第二次运行时会从第一个返回点开始运行,不会重新开始,如下图。
b. 线程
1. Const.h 1 2 3 4 5 6 7 8 9 10 11 #pragma once #define MAX_LENGTH 1024*2 #define HEAD_TOTAL_LEN 4 #define HEAD_ID_LEN 2 #define HEAD_DATA_LEN 2 #define MAX_RECVQUE 1000 #define MAX_SENDQUE 1000 enum MSG_ID { MSG_HELLO_WORD = 1001 };
该头文件用于定义消息收发中需要用到的宏
MAX_LENGTH
:一次最多接收2048字节的内容
HEAD_TOTAL_LEN
:消息头结点总长度为4字节,包括消息id和消息体长度
HEAD_ID_LEN
:消息id,2字节
HEAD_DATA_LEN
:消息长度,2字节
MAX_RECVQUE
:接收队列最多容纳1000个处理请求
MAX_SENDQUE
:发送队列最多容纳1000个处理请求
MSG_ID
:用于存储消息id,通过id在逻辑队列中查找对应的回调函数
2. 消息存储节点MsgNode a. MsgNode.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #pragma once #include <string> #include "Const.h" #include <iostream> #include <boost/asio.hpp> class MsgNode { public : short _cur_len; short _total_len; char * _data; MsgNode (short max_len) : _total_len(max_len), _cur_len(0 ) { _data = new char [_total_len + 1 ](); _data[_total_len] = '\0' ; } ~MsgNode () { std::cout << "destruct MsgNode" << std::endl; delete [] _data; } void Clear () { ::memset (_data, 0 , _total_len); _cur_len = 0 ; } }; class RecvNode : public MsgNode {private : short _msg_id; public : RecvNode (short max_len, short msg_id); const short & GetMsgID () { return _msg_id; } }; class SendNode : public MsgNode {private : short _msg_id; public : SendNode (const char * msg, short max_len, short msg_id); };
MsgNode
类主要用于构建消息接收节点和消息发送节点
RecvNode
节点作为接收节点,构造函数仅需最大长度和消息id,同时定义一个GetMsgID()
函数用于返回消息id(在逻辑层根据消息id寻找对应已注册回调函数时需要用到)
SendNode
节点作为发送节点,不仅需要知道最大长度和消息id,还需传入消息首地址msg
b. 成员函数实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include "MsgNode.h" RecvNode::RecvNode (short max_len, short msg_id) : MsgNode (max_len), _msg_id(msg_id){ } SendNode::SendNode (const char * msg, short max_len, short msg_id) : MsgNode (max_len + HEAD_TOTAL_LEN), _msg_id(msg_id) { short msg_id_net = boost::asio::detail::socket_ops::host_to_network_short (msg_id); memcpy (_data, &msg_id_net, HEAD_ID_LEN); short max_len_net = boost::asio::detail::socket_ops::host_to_network_short (max_len); memcpy (_data + HEAD_ID_LEN, &max_len_net, HEAD_DATA_LEN); memcpy (_data + HEAD_TOTAL_LEN, msg, max_len); }
RecvNode
节点的构造函数比较简单,只需要调用基类MsgNode
的构造函数并将消息id传入_msg_id
SendNode
不仅需要上述两步(基类MsgNode
构造函数传入的最大长度不是传入的值,还需要再传入值基础上加4字节,4字节用于存储消息id和消息体长度),还需进行以下操作:
将传入的消息id转换为网络字节序并存储至_data
(MsgNode类成员变量),长度2字节
将传入的消息长度转换为网络字节序并存储至_data
(MsgNode类成员变量),长度2字节
_data
偏移4字节后存储消息体内容,长度为传入值max_len
因为不同设备的字节序存在大端序和小端序的问题,必须将消息id和消息长度转换为网络序,否则消息内容可能会紊乱,网络字节序的内容可参考
爱吃土豆:网络编程(8)+字节序处理0 赞同 · 0 评论文章
3. CServer a. CServer.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #pragma once #include <memory> #include <map> #include <mutex> #include "boost/asio.hpp" #include "CSession.h" #include "AsioIOServicePool.h" #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include <boost/asio/co_composed.hpp> #include <boost/asio/detached.hpp> #include "Const.h" #include <queue> #include <mutex> #include <memory> #include "MsgNode.h" using boost::asio::ip::tcp;using boost::asio::awaitable;using boost::asio::co_spawn;using boost::asio::detached;using boost::asio::use_awaitable;namespace this_coro = boost::asio::this_coro;class CServer { private : void HandleAccept (std::shared_ptr<CSession>, const boost::system::error_code& error) ; void StartAccept () ; boost::asio::io_context& _io_context; short _port; boost::asio::ip::tcp::acceptor _acceptor; std::map<std::string, std::shared_ptr<CSession>> _sessions; std::mutex _mutex; public : CServer (boost::asio::io_context& io_context, short port); ~CServer (); void ClearSession (std::string) ; };
包含要求的头文件并using需要使用的命名空间和函数(协程 )
成员函数在定义的时候详细解释,这里介绍以下成员变量:
io_context& _io_context
:声明一个_io_context
的引用,注意,和 boost::asio::io_context _io_context
不同,io_context& _io_context
表示对现有io_context
对象(其他地方创建的ioc对象)的引用 ,这里并不是声明一个ioc
实例,而是表示对外界传入ioc
的引用(主函数需要将监听错误信号signal的ioc传入server用于监听客户端的连接请求accept)
_port
:服务器端口,客户端与服务器的指定端口进行连接
_acceptor
:用于绑定服务器指定ip和端口号,并进行监听客户端连接信号
_sessions
:存储会话任务的容器,类似python的字典,键是一个字符串(不同会话有不同的uuid ),值是session
,为每个session
会话映射对应的uuid ,通过uuid
可实现会话的擦除、插入和寻找
_mutex
:锁,保证线程安全
b. 成员函数实现 1 2 3 4 5 CServer::CServer (boost::asio::io_context& io_context, short port) : _io_context(io_context), _port(port), _acceptor(io_context, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4 (), port)) { std::cout << "Server start success, on port: " << port << std::endl; StartAccept (); }
CServer
的构造函数接收指定的上下文服务io_context
和端口号,并将acceptor
绑定服务器指定的ip和端口号,最后调用StartAccept()
启动监听服务
1 2 3 4 5 6 void CServer::StartAccept () { auto & io_context = AsioIOServicePool::GetInstance ()->GetIOService (); std::shared_ptr<CSession> new_session = std::make_shared <CSession>(io_context, this ); _acceptor.async_accept (new_session->GetSocket (), std::bind (&CServer::HandleAccept, this , new_session, std::placeholders::_1)); }
主函数(主线程)的io_context
主要用于监听错误信号和监听客户端连接请求,但在每个线程中的io_context
都是从线程池AsioIOServicePool
中取出独立运行的。总共启动n+1 个io_context
,n
是线程池中存储的ioc
,1是主函数中的ioc
。函数流程如下:
首先,从线程池中取出一个ioc
,并将线程池中ioc
的索引+1,以便轮询取出下一个ioc;
然后,定义一个新的session
会话,传入从线程池中取出的ioc
,并将server
指针传入(如果该session会话错误,通过server的Clear函数擦除该session会话)。通过智能指针shared_ptr
管理session
声明周期,通过智能指针在C++中实现伪闭包,以防session
在线程结束前被提前释放导致线程安全;
最后,使用绑定主函数传入ioc
和指定端点的acceptor
监听客户端(已经为该客户端建立了会话session
,只需客户端进行连接即可进行收发操作)连接。如果监听成功,调用回调函数HandleAccept
。
1 2 3 4 5 6 7 8 9 10 11 12 void CServer::HandleAccept (std::shared_ptr<CSession> new_session, const boost::system::error_code& error) { if (!error) { new_session->Start (); std::lock_guard<std::mutex> lock (_mutex) ; _sessions.insert (make_pair (new_session->GetUuid (), new_session)); } else { std::cerr << "session accept failed, error is " << error.what () << std::endl; } StartAccept (); }
该回调函数主要处理客户端的连接请求,如果客户端连接成功 (没有错误),那么启动会话new_session->Start()
,并加锁将该会话以及对应的uuid
插入至_sessions
,以便后续管理此session
,‘}’
结束后自动释放锁,无需手动释放。
最后调用StartAccept()
,重新从线程池中取出ioc并创建一个新的会话session ,只待主线程的ioc监听到客户端连接申请之后,便将新会话交给新连接请求进行收发。
1 2 3 4 void CServer::ClearSession (std::string uuid) { std::lock_guard<std::mutex> lock (_mutex) ; _sessions.erase (uuid); }
该函数用于将指定uuid
的会话从会话存储容器_sessions
中擦除,每个uuid
对应一个session
,uuid
通过雪花算法实现。
4. 线程池AsioIOServicePool a. AsioIOServicePool.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #pragma once #include "boost/asio.hpp" #include <iostream> #include <vector> #include <memory> class AsioIOServicePool ;class SafeDeletor {public : void operator () (AsioIOServicePool* st) ; }; class AsioIOServicePool { public : friend class SafeDeletor ; using IOService = boost::asio::io_context; using Work = boost::asio::io_context::work; using WorkPtr = std::unique_ptr<Work>; static std::shared_ptr<AsioIOServicePool> GetInstance () { static std::once_flag s_flag; std::call_once (s_flag, [&]() { _instance = std::shared_ptr <AsioIOServicePool>(new AsioIOServicePool, SafeDeletor ()); }); return _instance; } boost::asio::io_context& GetIOService () ; void Stop () ; private : AsioIOServicePool (std::size_t size = std::thread::hardware_concurrency ()); ~AsioIOServicePool (); AsioIOServicePool (const AsioIOServicePool&) = delete ; AsioIOServicePool& operator = (const AsioIOServicePool&) = delete ; std::vector<IOService> _ioServices; std::vector<WorkPtr> _works; std::vector<std::thread> _threads; std::size_t _nextIOService; static std::shared_ptr<AsioIOServicePool> _instance; };
建立线程池AsioIOServicePool
,用于多线程收发。该服务器有两个单例类,分别是AsioIOServicePool
(线程池)和LogicSystem
(逻辑层),我使用了两种方式实现单例模式,AsioIOServicePool
的单例实现比较详细,LogicSystem
的实现比较简单,但代码更少更清晰。
1 2 3 4 class SafeDeletor {public : void operator () (AsioIOServicePool* st) ; };
SafeDeletor
是指定的删除器,我们将析构函数设置为私有,防止析构函数被主动调用,析构函数只能通过删除器SafeDeletor
被调用,这里重载了()
,将SafeDeletor
类作为仿函数使用。
AsioIOServicePool
的成员函数在后面定义的时候详细解释,这里介绍AsioIOServicePool
以下成员变量:
IOService
:将boost::asio::io_context
重命名为IOService,以便使用
Work
:将boost::asio::io_context::work
重命名为Work,Work用来绑定ioc,以防ioc.run()提前返回,详细解释可参考
爱吃土豆:网络编程(16)——asio多线程模型IOServicePool5 赞同 · 0 评论文章
WorkPtr
:使用unique_ptr管理work,希望该work不会被拷贝,只能移动或者从头用到尾不被改变
_ioServices
:io_context存储容器,用于存储指定数量的io_context
_works
:存储与ioc数量对应的work
_threads
:存储指定数量的线程
_nextIOService
:记录ioc在vector的下标,通过轮询返回ioc时,需要记录当前ioc的下标,累加,当超过vector的size时就归零,然后继续按轮询的方式返回
_instance
:声明一个静态成员变量,用于存储单例类AsioIOServicePool唯一实例,通过懒汉方式实现,单例模式的详细介绍可参考:
爱吃土豆:网络编程(13)——单例模式0 赞同 · 0 评论文章
b. 成员函数实现 1 2 3 4 void SafeDeletor::operator () (AsioIOServicePool* st) { std::cout << "this is safe deleter operator() of AsioIOServicePool" << std::endl; delete st; }
不能直接调用单例类的析构函数,只能通过删除器间接调用,单例类的构造函数和析构函数都私有化,并将复制构造函数和赋值运算符delete,防止单例类被拷贝和赋值。
1 2 3 4 5 6 7 8 9 10 11 12 13 AsioIOServicePool::AsioIOServicePool (std::size_t size) : _ioServices(size), _works(size), _nextIOService(0 ) { for (std::size_t i = 0 ; i < size; i++) { _works[i] = std::unique_ptr <Work>(new Work (_ioServices[i])); } for (std::size_t i = 0 ; i < _ioServices.size (); i++) { _threads.emplace_back ([this , i]() { _ioServices[i].run (); }); } }
线程池AsioIOServicePool
的构造函数需接收ioc
的指定数量size
,如果不显式指定,那么size
会使用默认值 std::thread::hardware_concurrency()
,该函数用于获取CPU核数。
work
的数量和ioc
的数量相同,索引_nextIOService
默认值为0,上限为ioc
的数量size
还有两个for循环,第一个for循环 使用Work()
函数将指定ioc
与work
绑定并通过右值 移动给对应位置的_works
容器,此时_works
容器中对应索引便存储了与对应ioc
绑定的work
,只有将work
给reset
之后,ioc.run()
才会返回。第二个for循环 启动指定ioc数量_size
的线程 ,每个线程中有独立的ioc
,并启动ioc.run()
1 2 3 4 5 6 7 boost::asio::io_context& AsioIOServicePool::GetIOService () { auto & service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size ()) _nextIOService = 0 ; return service; }
以_nextIOService
为索引轮询取出线程池中的io_context
1 2 3 4 5 6 7 8 9 void AsioIOServicePool::Stop () { for (auto & work : _works) { work.reset (); } for (auto & t : _threads) { t.join (); } }
不同线程的io_context
如果想要停止返回,那么必须将已绑定的work
释放,该函数用于释放所有ioc
的work
,因为不同线程独立运行,所以必须要等待每个线程结束后才结束程序。
1 2 3 4 5 6 7 static std::shared_ptr<AsioIOServicePool> GetInstance () { static std::once_flag s_flag; std::call_once (s_flag, [&]() { _instance = std::shared_ptr <AsioIOServicePool>(new AsioIOServicePool, SafeDeletor ()); }); return _instance; }
定义静态成员函数GetInstance()
,通过C++11新特性once_flag
和call_once
实现线程安全,保证多线程中AsioIOServicePool
只会被创建一次,有且仅有唯一实例。
once_flag
和call_once
的详细介绍可参考:
爱吃土豆:C++11新特性0 赞同 · 0 评论文章
最后注意,非模板类 的静态成员变量和函数需要在实现文件cpp中初始化,不能在头文件中初始化。模板类 的静态成员变量必须在头文件中定义,模板类中的静态变量可以在头文件中定义,是因为模板类在编译时不会像普通类那样生成具体的类型和符号。相反,模板类的代码只会在模板被实例化时才生成具体的类型。因此,模板类的静态成员变量不会像普通类的静态成员那样在多个编译单元中重复定义。
1 2 3 AsioIOServicePool.cpp std::shared_ptr<AsioIOServicePool> AsioIOServicePool::_instance = nullptr ;
5. 逻辑层LogicSystem 一般在解析完对端发送的数据之后,还要对该请求做更进一步地处理,比如根据不同的消息id执行不同的逻辑层函数或不同的操作,比如读数据库、写数据库,还比如游戏中可能需要给玩家叠加不同的buff、增加积分等等,这些都需要交给逻辑层 处理,而不仅仅是把消息发给对端。
服务器架构
上图所示的是一个完成的服务器架构,一般需要将逻辑层独立出来,因为如果在解析完对端数据后需要执行一些复杂的操作,比如玩家需要叠加各自buff或者技能,此时可能会耗时1s甚至更多,如果没有独立的逻辑层进行操作,那么系统会一直停留在执行回调函数那一步,造成阻塞 ,直至操作结束。
而逻辑层是独立的,回调函数只需将数据投递给逻辑队列(回调函数将数据放入队列中之后系统会运行下一步,便不会被阻塞),逻辑系统会自动从队列中取数据并做相应操作,如果需要在执行完操作之后做相应回复,那么逻辑系统会调用写事件并注册写回调给asio网络层,网络层就是asio底层通信的网络层步骤。
a. LogicSystem.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #pragma once #include <queue> #include <thread> #include "CSession.h" #include <map> #include <functional> #include "Const.h" #include <json/json.h> #include <json/value.h> #include <json/reader.h> class LogicNode ;class CSession ;typedef std::function<void (std::shared_ptr<CSession>, const short & msg_id, const std::string& msg_data)> FunCallBack;class LogicSystem { private : LogicSystem (); ~LogicSystem (); LogicSystem (const LogicSystem&) = delete ; LogicSystem& operator =(const LogicSystem&) = delete ; void RegisterCallBacks () ; void HelloWordCallBack (std::shared_ptr<CSession>, const short & msg_id, const std::string& msg_data) ; void DealMsg () ; std::queue<std::shared_ptr<LogicNode>> _msg_que; std::mutex _mutex; std::condition_variable _consume; std::thread _worker_thread; bool _b_stop; std::map<short , FunCallBack> _fun_callback; public : void PostMsgToQue (std::shared_ptr<LogicNode> msg) ; static LogicSystem& GetInstance () ; };
定义逻辑类的头文件,成员函数在后面定义时详细介绍,这里介绍一下逻辑类的成员变量:
FunCallBack
为要注册的回调函数类型,其参数为会话Session
类智能指针,消息id,以及消息内容。
_msg_que
为逻辑队列,其中的元素相当于RecvNode
,只不过为了实现伪闭包 ,所以创建一个LogicNode
类,包含Session
的智能指针防止被提前释放
_mutex
为保证逻辑队列安全的互斥量
_consume
表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
_fun_callbacks
用于存储消息id和对应消息id的回调函数,根据id查找对应的逻辑处理函数。
_worker_thread
表示工作线程,用来从逻辑队列中取数据并执行回调函数。
_b_stop
表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。
_fun_callback
的定义如下:
1 2 3 typedef std::function<void (std::shared_ptr<CSession>, const short & msg_id, const std::string& msg_data)> FunCallBack;std::map<short , FunCallBack> _fun_callback;
_fun_callback
是一个map
,值和键的类型分别为short
和FunCallBack
,前者是消息id的类型,后者是回调函数。
FunCallBack
使用了C++11新特性std::function
来声明了一个函数签名,表示接受一个 shared_ptr
指向 CSession
、一个 short
类型的消息 ID 和一个 std::string
类型的消息数据的无返回对象的可调用对象
std::function
的详细介绍可参考
爱吃土豆:C++11新特性0 赞同 · 0 评论文章
因为逻辑队列存储的成员需要包含对应session
和消息内容,所以需要重新定义一个逻辑节点,用于存储会话session
中将对应消息投递至逻辑队列的内容,包括该session
和消息内容。
b. LogicNode 1 2 3 4 5 6 7 8 class LogicNode { friend class LogicSystem ; public : LogicNode (std::shared_ptr<CSession>, std::shared_ptr<RecvNode>); private : std::shared_ptr<CSession> _session; std::shared_ptr<RecvNode> _recvnode; };
LogicNode
类定义在CSession.h
中,CSession.h
在文章后面介绍
_session
:声明Session的智能指针,实现伪闭包,防止Session被提前释放
_recvnode
:接收消息节点的智能指针
构造函数如下:
1 2 LogicNode::LogicNode (std::shared_ptr<CSession> session, std::shared_ptr<RecvNode> recvnode) : _session(session), _recvnode(recvnode){}
用于接收一个session
会话和该会话投递的消息内容
c. 成员函数实现 1 2 3 4 LogicSystem::LogicSystem () :_b_stop(false ) { RegisterCallBacks (); _worker_thread = std::thread (&LogicSystem::DealMsg, this ); }
_b_stop
初始化为false,当前工作线程不会停止
RegisterCallBacks()
:调用回调注册函数:这个函数用于注册消息处理的回调函数 。将不同的消息 ID 和对应的处理函数关联起来,以便在处理消息时能够找到正确的函数。
_worker_thread = std::thread(&LogicSystem::DealMsg, this);
创建工作线程:使用 std::thread
创建一个新的线程。
&LogicSystem::DealMsg
:指定要在新线程中执行的成员函数,即 DealMsg
,该函数将负责从消息队列中提取消息并处理它。
this:传递当前对象的指针,以便 DealMsg 可以访问 LogicSystem 的成员变量和函数。
1 2 3 4 void LogicSystem::RegisterCallBacks () { _fun_callback[MSG_HELLO_WORD] = std::bind (&LogicSystem::HelloWordCallBack, this , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); }
将消息ID:MSG_HELLO_WORD
映射至回调函数HelloWordCallBack
,并存储至_fun_callback
。当接收到消息MSG_HELLO_WORD
时,系统会调用 HelloWordCallBack
方法来处理消息。
1 2 3 4 5 6 7 8 9 10 11 void LogicSystem::HelloWordCallBack (std::shared_ptr<CSession> session, const short & msg_id, const std::string& msg_data) { Json::Reader reader; Json::Value root; reader.parse (msg_data, root); std::cout << "receive msg id is " << root["id" ].asInt () << " msg data is " << root["data" ].asString () << std::endl; root["data" ] = "server has receive msg, msg data is " + root["data" ].asString (); std::string return_str = root.toStyledString (); session->Send (return_str, root["id" ].asInt ()); }
HelloWordCallBack
是对应消息id:MSG_HELLO_WORD
的回调函数
使用jsoncpp
库包装或者解析数据
调用 parse
方法将 msg_data
字符串解析为 JSON 格式,结果存储在 root 中
通过session
类的send
回传消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void LogicSystem::DealMsg () { for (;;) { std::unique_lock<std::mutex> unique_lk (_mutex) ; while (_msg_que.empty () and !_b_stop) { _consume.wait (unique_lk); } if (_b_stop) { while (!_msg_que.empty ()) { auto msg_node = _msg_que.front (); std::cout << "recv msg id is " << msg_node->_recvnode->GetMsgID () << std::endl; auto call_back_iter = _fun_callback.find (msg_node->_recvnode->GetMsgID ()); if (call_back_iter == _fun_callback.end ()) { _msg_que.pop (); continue ; } call_back_iter->second (msg_node->_session, msg_node->_recvnode->GetMsgID (), std::string (msg_node->_recvnode->_data, msg_node->_recvnode->_total_len)); _msg_que.pop (); } break ; } auto msg_node = _msg_que.front (); std::cout << "recv msg id is " << msg_node->_recvnode->GetMsgID () << std::endl; auto call_back_iter = _fun_callback.find (msg_node->_recvnode->GetMsgID ()); if (call_back_iter == _fun_callback.end ()) { _msg_que.pop (); continue ; } call_back_iter->second (msg_node->_session, msg_node->_recvnode->GetMsgID (), std::string (msg_node->_recvnode->_data, msg_node->_recvnode->_total_len)); _msg_que.pop (); } }
当逻辑类实例化后,开始循环运行消息处理函数DealMsg()
。为了实现自由加解锁,定义一个unique_lock
类的锁unique_lk
。当逻辑队列为空并且服务器未停止时,进行等待while循环中,因为while循环也会占用资源,所以使用C++11特性条件变量 std::condition_variable
将线程挂起,该线程不会占用cpu资源,并释放锁使得其他线程可以获取共享资源,线程挂起持续至被唤醒(notify_one()
或者notify_all()
)。其实这里再开头加锁会导致锁粒度过大,我们只需要从队列取元素和加元素的时候加锁,既能保证线程安全,又可降低锁粒度。
当线程被唤醒后,如果服务器关闭,则取出逻辑队列所有数据处理并退出循环
当线程被唤醒后,如果队列不为空且服务器未关闭,那么取出队列第一个数据,并在注册map中查找对应数据id的回调函数,如果没有找到,那么释放该数据并处理下一条消息;反之,调用该回调函数。这里我们只会传入消息id:MSG_HELLO_WORD
,回调函数HelloWordCallBack
会被调用
1 2 3 4 5 6 LogicSystem::~LogicSystem () { std::cout << "逻辑层成功析构" << std::endl; _b_stop = true ; _consume.notify_one (); _worker_thread.join (); }
逻辑类的析构函数在所有工作线程运行结束后被执行,但工作线程可能会处于挂起状态,此时需要一个激活信号打断_consume
的wait
状态(在该命令前一步将_b_stop置为true)
1 2 3 4 5 6 7 8 void LogicSystem::PostMsgToQue (std::shared_ptr<LogicNode> msg) { std::unique_lock<std::mutex> unique_lk (_mutex) ; _msg_que.push (msg); if (_msg_que.size () == 1 ) { _consume.notify_one (); } }
该函数在Session会话中将对应session指针和消息内容投递至逻辑队列处理。
1 2 3 4 5 LogicSystem& LogicSystem::GetInstance () { static LogicSystem instance; return instance; }
LogicSystem
类也是一个单例模式 ,我这里使用了第二种方式来定义单例模式。我们只需要定义一个静态成员函数GetInstance()
,在该函数中返回局部静态变量instance
,instance
是LogicSystem
的实例。该方法只能在C++11及以后的平台才可以使用,因为返回局部静态变量只有在C++11及以上是线程安全的。
逻辑类的详细介绍可参考:
爱吃土豆:网络编程(12)——完善粘包处理操作+asio底层通信过程2 赞同 · 0 评论文章
爱吃土豆:网络编程(14)——基于单例模板实现的逻辑层1 赞同 · 0 评论文章
6. CSession CSession用于接收客户端的连接,并处理异步收发,在该类中,构造一个收协程 用于接收数据,但发仍然通过Send函数进行队列发送,不在构造一个发协程。 理由如下:
可以构建发送协程用于发送数据,但考虑到协程很大程度上在使用层面简化了代码,但实际上也耗费资源,如果服务器进行频繁发送操作,且处于任何时刻都可能调用的情况下,使用协程可能并不方便 ,因为可能在一个线程中,随着发送次数的上升,启动很多协程
还有一个原因是:如果发送的时候处于另一个线程,如果用发送协程的话,可能会导致其他线程声明周期的改变
所以我们这里仍然使用之前代码中用到的发送队列的方式。
a. CSession.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #pragma once #include <iostream> #include <boost/asio.hpp> #include <queue> #include "MsgNode.h" #include "LogicSystem.h" class CServer ;class CSession :public std::enable_shared_from_this<CSession>{ private : void HandleWrite (const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) ; boost::asio::io_context& _io_context; CServer* _server; boost::asio::ip::tcp::socket _socket; std::string _uuid; bool _b_close; std::mutex _send_lock; std::queue<std::shared_ptr<SendNode>> _send_que; std::shared_ptr<RecvNode> _recv_msg_node; std::shared_ptr<MsgNode> _recv_head_node; public : CSession (boost::asio::io_context& io_context, CServer* server); ~CSession (); boost::asio::ip::tcp::socket& GetSocket () ; void Start () ; std::string& GetUuid () ; void Close () ; void Send (const char * msg, short max_length, short msg_id) ; void Send (std::string msg, short msg_id) ; }; class LogicNode { friend class LogicSystem ; public : LogicNode (std::shared_ptr<CSession>, std::shared_ptr<RecvNode>); private : std::shared_ptr<CSession> _session; std::shared_ptr<RecvNode> _recvnode; };
同样,CSession类的成员函数在后面定义时详细介绍,这里仅介绍成员变量:
boost::asio::io_context& _io_context
:用于接收从线程池AsioIOServicePool
中提取的ioc
,注意,这里是引用 而不是重新声明一个io_context
新实例
CServer _server
:接收CServer
指针,当Session
收发操作出错时,方便调用Server
的Clear
函数擦除该session
会话任务
_socket
:每个session
都有一个独立的套接字,负责异步收发
_b_close
:session会话任务是否结束的标志位
_send_que
:发送队列,逻辑层会调用session的Send发送函数,Send函数会将需要发送的消息添加到发送队列,保证发送的顺序性
_recv_msg_node
:接收节点,将接收的消息存储至该节点,包括消息id和消息内容
_recv_head_node
:消息头节点,将收到消息的id和长度存储至该节点
LogicNode
:逻辑节点,负责将session和消息内容投递至逻辑队列,逻辑队列元素的格式为LogicNode
b. 成员函数实现 1 2 3 4 5 6 7 CSession::CSession (boost::asio::io_context& io_context, CServer* server) : _io_context(io_context), _server(server), _socket(io_context), _b_close(false ) { boost::uuids::uuid a_uuid = boost::uuids::random_generator ()(); _uuid = boost::uuids::to_string (a_uuid); _recv_head_node = std::make_shared <MsgNode>(HEAD_TOTAL_LEN); }
CSession
的构造函数需要接收线程池中的ioc
、server
指针,并将_b_close
置为false
,表示该会话未关闭
通过boost
的random_generator()()
函数获取一个uuid
,每个session
都有自己的uuid
,相当于一个名称,通过该名称可找出对应的session
会话,该函数通过雪花算法实现。
定义消息接收节点_recv_head_node
,总长度4字节,包括消息id和消息长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 void CSession::Start () { auto shared_this = shared_from_this (); boost::asio::co_spawn (_io_context, [self = shared_from_this (), this ]()->boost::asio::awaitable<void > { try { for (; !_b_close;) { _recv_head_node->Clear (); std::size_t n = co_await boost::asio::async_read (_socket, boost::asio::buffer ( _recv_head_node->_data, HEAD_TOTAL_LEN), use_awaitable); if (n == 0 ) { std::cout << "receive peer closed" << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } short msg_id = 0 ; memcpy (&msg_id, _recv_head_node->_data, HEAD_ID_LEN); msg_id = boost::asio::detail::socket_ops::network_to_host_short (msg_id); std::cout << "msg id is " << msg_id << std::endl; if (msg_id > MAX_LENGTH) { std::cout << "invalid msg id is " << msg_id << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } short msg_len = 0 ; memcpy (&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN); msg_len = boost::asio::detail::socket_ops::network_to_host_short (msg_len); std::cout << "msg len is " << msg_len << std::endl; if (msg_len > MAX_LENGTH) { std::cout << "invalid msg len is " << msg_len << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } _recv_msg_node = std::make_shared <RecvNode>(msg_len, msg_id); n = co_await boost::asio::async_read (_socket, boost::asio::buffer (_recv_msg_node->_data, _recv_msg_node->_total_len),use_awaitable); if (n == 0 ) { std::cout << "receive peer closed " << std::endl; Close (); _server->ClearSession (_uuid); co_return ; } _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; std::cout << "receive data is " << _recv_msg_node->_data << std::endl; LogicSystem::GetInstance ().PostMsgToQue (std::make_shared <LogicNode>(shared_from_this (), _recv_msg_node)); } } catch (std::exception& e) { std::cerr << "exception is " << e.what () << std::endl; Close (); _server->ClearSession (_uuid); } }, detached); }
Start()
函数用于接收消息内容,并通过另一种简便的粘包处理方式获得消息id、消息长度,可参考:
爱吃土豆:网络编程(11)——另一种简便的粘包处理方式1 赞同 · 0 评论文章
该函数在Server
中收到客户端连接请求后,Server
将一个新的session
分配给该客户端,并调用Start()
函数。
首先,构造一个收协程:
这里通过boost::asio::co_spawn
构造一个收协程,传入ioc
和lambda
函数,以及detached
。注意,如果用 =
捕获所有变量,但是lambda表达式并未使用智能指针则不会增加引用计数,除非在lambda内部使用这个指针或者在 []
中显式捕获该指针,我们这里通过self = shared_from_this()
, this显式增加session的引用计数,防止协程处理过程中,智能指针被意外释放,通过智能指针实现伪闭包。
显式指定返回类型为boost::asio::awaitable<void>
,awaitable<void>
类型允许函数在执行时可以被暂停和恢复,这使得它能够与 co_await
一起使用,所以函数返回类型必须是awaitable<void>
。
session
会在Start()
函数中循环执行收协程,直至该会话被关闭,每一次执行都需要将消息头结点清空,接收下一个消息。通过co_await
关键字和use_awaitable
以同步的方式调用异步读操作async_read
,并将读到的字节数(4字节)返回,只有读到4字节才会返回,否则已知挂起。这里和asynv_read_some
有一些区别然后分别读取消息id、消息长度和消息内容,且需要将消息id和消息长度从网络序列转换为本地序列。这里使用async_read
指定读取字节数,而不是像之前粘包处理使用async_read_some
函数,通过调用回调函数处理粘包问题。
最后,在接收完所有消息后,将消息投递至逻辑队列进行相应的处理。
在协程中,co_return
用来返回一个值或表示协程结束 ,它将把值传递给协程的返回对象(如果返回类型不为boost::asio::awaitable<void>
,对象是boost::asio::awaitable
)。
co_return
只能在协程中使用,而普通函数中使用的是 return。
co_return
将结果传递给协程的承诺(promise)对象,这个对象会将值交给协程的返回类型
协程关键字和函数use_awaitable、co_spawn、detached
如何使用请参考文章:
爱吃土豆:网络编程(18)——使用asio协程实现并发服务器demo(官方案例)4 赞同 · 0 评论文章
我们已经构建了一个接收协程用于接收数据,那么我们需要构建发送协程吗?
可以构建发送协程用于发送数据,但考虑到协程很大程度上在使用层面简化了代码,但实际上也耗费资源,如果服务器进行频繁发送操作,且处于任何时刻都可能调用的情况下,使用协程可能并不方便 ,因为可能在一个线程中,随着发送次数的上升,启动很多协程
还有一个原因是:如果发送的时候处于另一个线程,如果用发送协程的话,可能会导致其他线程声明周期的改变
所以我们这里仍然使用之前代码中用到的发送队列的方式,Send函数两种重载定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void CSession::Send (const char * msg, short max_length, short msg_id) { bool pending = false ; std::unique_lock<std::mutex> lock (_send_lock) ; int send_que_size = _send_que.size (); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl; return ; } if (_send_que.size () > 0 ) { pending = true ; } _send_que.push (std::make_shared <SendNode>(msg, max_length, msg_id)); if (pending) { return ; } auto msgnode = _send_que.front (); lock.unlock (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, shared_from_this ())); } void CSession::Send (std::string msg, short msg_id) { bool pending = false ; std::unique_lock<std::mutex> lock (_send_lock) ; int send_que_size = _send_que.size (); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << std::endl; return ; } if (_send_que.size () > 0 ) { pending = true ; } _send_que.push (std::make_shared <SendNode>(msg.c_str (), msg.length (), msg_id)); if (pending) { return ; } auto msgnode = _send_que.front (); lock.unlock (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, shared_from_this ())); }
Send()
的两种重载,针对数据内容是string类型和char*
类型。
首先设置unique_lock
,方便自由加解锁;
获取发送队列元素个数,并判断元素数量是否合法,如果队列中已存在元素,那么将本条消息传入队列然后返回,等待上一条消息发送成功;如果队列为空,那么取出队列的第一个元素并解锁(我们只需要在使用队列时加锁,不使用队列是解锁,避免不相干的逻辑占用锁) ,然后同步异步写async_write发送数据(指定字节数量),这里不需要构造写协程,而是直接通过send函数调用异步写进行发送,发送成功后调用回调函数HandleWrite 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void CSession::HandleWrite (const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) { try { if (!error) { std::unique_lock<std::mutex> lock (_send_lock) ; _send_que.pop (); if (!_send_que.empty ()) { auto & msgnode = _send_que.front (); lock.unlock (); boost::asio::async_write (_socket, boost::asio::buffer (msgnode->_data, msgnode->_total_len), std::bind (&CSession::HandleWrite, this , std::placeholders::_1, shared_self)); } } else { std::cerr << "handle write failed, error is " << error.what () << std::endl; Close (); _server->ClearSession (_uuid); } } catch (std::exception& e) { std::cout << "exception is " << e.what () << std::endl; Close (); _server->ClearSession (_uuid); } }
如果发送成功,那么先加锁保护发送队列,然后移除上一个已发送的消息,并判断队列是否为空,如果不为空,取出第一个数据并解锁,并调用异步发函数发送该数据;如果发送失败,那么关闭session,并调用Server的ClearSession函数擦除对应uuid的session。
1 2 3 boost::asio::ip::tcp::socket& CSession::GetSocket () { return _socket; }
返回session的独立套接字
1 2 3 std::string& CSession::GetUuid () { return _uuid; }
返回session的uuid
1 2 3 4 void CSession::Close () { _b_close = true ; _socket.close (); }
显式关闭Session会话,首先将标志位_b_close 置1,并关闭套接字
1 2 3 4 5 6 7 8 CSession::~CSession () { try { std::cout << "~CSession destruct" << std::endl; } catch (std::exception& e) { std::cerr << "exception is " << e.what () << std::endl; } }
7. 主函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include "CServer.h" #include <csignal> #include <thread> #include <mutex> #include "AsioIOServicePool.h" int main () { try { auto pool = AsioIOServicePool::GetInstance (); boost::asio::io_context io_context; boost::asio::signal_set signals (io_context, SIGINT, SIGTERM) ; signals.async_wait ([&io_context, &pool](auto ,auto ) { io_context.stop (); pool->Stop (); }); CServer s (io_context, 10086 ) ; io_context.run (); } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << std::endl; } }
8. 客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <boost/asio.hpp> #include <iostream> #include <json/json.h> #include <json/value.h> #include <json/reader.h> using namespace boost::asio::ip;using std::cout;using std::endl;const int MAX_LENGTH = 1024 * 2 ; const int HEAD_LENGTH = 2 ;const int HEAD_TOTAL = 4 ;int main () { try { boost::asio::io_context ioc; tcp::endpoint remote_ep (address::from_string("127.0.0.1" ), 10086 ) ; tcp::socket sock (ioc) ; boost::system::error_code error = boost::asio::error::host_not_found; sock.connect (remote_ep, error); if (error) { cout << "connect failed, code is " << error.value () << " error msg is " << error.message (); return 0 ; } Json::Value root; root["id" ] = 1001 ; root["data" ] = "hello world" ; std::string request = root.toStyledString (); size_t request_length = request.length (); char send_data[MAX_LENGTH] = { 0 }; int msgid = 1001 ; int msgid_host = boost::asio::detail::socket_ops::host_to_network_short (msgid); memcpy (send_data, &msgid_host, 2 ); int request_host_length = boost::asio::detail::socket_ops::host_to_network_short (request_length); memcpy (send_data + 2 , &request_host_length, 2 ); memcpy (send_data + 4 , request.c_str (), request_length); boost::asio::write (sock, boost::asio::buffer (send_data, request_length + 4 )); char reply_head[HEAD_TOTAL]; size_t reply_length = boost::asio::read (sock, boost::asio::buffer (reply_head, HEAD_TOTAL)); msgid = 0 ; memcpy (&msgid, reply_head, HEAD_LENGTH); short msglen = 0 ; memcpy (&msglen, reply_head + 2 , HEAD_LENGTH); msglen = boost::asio::detail::socket_ops::network_to_host_short (msglen); char msg[MAX_LENGTH] = { 0 }; size_t msg_length = boost::asio::read (sock, boost::asio::buffer (msg, msglen)); Json::Reader reader; reader.parse (std::string (msg, msg_length), root); std::cout << "msg id is " << root["id" ] << " msg is " << root["data" ] << endl; getchar (); } catch (std::exception& e) { std::cerr << "Exception: " << e.what () << endl; } return 0 ; }
9. 测试
代码:
https://github.com/qiaobeibei/CoroutineServergithub.com/qiaobeibei/CoroutineServer