(请移步到)
https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&album_id=1539935825956798467&__biz=MzU5NzcwMjI2Mw==#wechat_redirect
前面两章已经介绍了Socket和SocketAsyncEventArgs两个关于通讯操作的基础对象,接下来要做的工作是如何实现服务;在做之前都需要总结一下要做什么功能,然后针对功能进行相应接口行为定义。而在这里的设计更多是依据于数看在这方面的工作经验,而接下来要详细讲述的组件的服务端行为接口是如何设计。
在做之前要归纳服务端总体上需要些什么,然后再针对功能制定接口和类行为信息。作为一个服务端组件它应该具备以下基础功能: 服务配置, 会话管理, 协议分析器, 日志,和基础数据统计.针对以上的功能制定服务基础接口规范:
public interface IServer : IDisposable
{
//当前在线数
int Count
{ get; }
//服务配置
ServerOptions Options
{ get; }
//接收数据的缓冲池
Buffers.BufferPoolGroup ReceiveBufferPool { get; }
//发送数据的缓冲池
Buffers.BufferPoolGroup SendBufferPool { get; }
//配置的委托
IServer Setting(Action<ServerOptions> handler);
//版本号
long Version { get; }
//获取当前在线的所有会话
ISession[] GetOnlines();
//打开服务
bool Open();
//启动时输出的Logo
Action WriteLogo { get; set; }
//暂停
bool Pause();
//恢复服务
void Resume();
//获取服务运行时长(毫秒)
long GetRunTime();
//服务名称
string Name { get; set; }
//服务处理相关事件行为
IServerHandler Handler { get; set;}
//协议分析器
IPacket Packet { get; set;}
//判断日志等级
bool EnableLog(EventArgs.LogType logType);
//获取指定ID的会话
ISession GetSession(long id);
//获取服务状态
ServerStatus Status { get; set; }
//更新会话活跃时间
void UpdateSession(ISession session);
//记录日志
void Log(EventArgs.LogType type, ISession session, string message);
//记录日志
void Log(EventArgs.LogType type, ISession session, string message, params object[] parameters);
//记录错误
void Error(Exception error, ISession session, string message);
//记录错误
void Error(Exception error, ISession session, string message, params object[] parameters);
//触发会话接收数据事件
void SessionReceive(EventArgs.SessionReceiveEventArgs e);
//关闭会话
void CloseSession(ISession session);
//向指定会话发送消息
bool Send(object message, ISession session);
//向指定会话发送消息
bool[] Send(object message, params ISession[] sessions);
//向指定会话发送消息
bool[] Send(object message, System.ArraySegment<ISession> sessions);
//发送IO总量
long SendQuantity { get; }
//接收IO总量
long ReceiveQuantity { get; }
//接收数据总量
long ReceivBytes {get;}
//发送数据总量
long SendBytes { get; }
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/IServer.cs)
接下来详细介绍每个功能涉及到具体设计和定义
服务配置
配置是服务的基础信息,它主要包括有:服务地址,缓冲区大小了,可接管的最大连接数,超时时间,字符编码和加密方式等等。组件设计了一个ServerOptions类结构用于描述这些信息:
public class ServerOptions
{
//会话发送队列最大等待值,超过则关闭会话
public int MaxWaitMessages { get; set; } = 0;
//队列数,主要用于发送和接收消息处理
public int IOQueues { get; set; }
//是否同步Accept连接
public bool SyncAccept { get; set; } = true;
//会话活跃超时时间,为零是不处理,当大于零的情况会检会话活跃时间是否超时,超时则关闭
public int SessionTimeOut { get; set; }
//是否启用IPv6
public bool UseIPv6 { get; set; }
//日志输出级别
public EventArgs.LogType LogLevel { get; set; }
//是否记录统计信息
public bool Statistical { get; set; }
//是否合并序列化消息,为0不处理;当同时发送消息给多个会话时进行一次序列化处理。
public int Combined{ get; set;}
//返回第一个监听信息配置
public ListenHandler DefaultListen => Listens[0];
//配置多个监听列表
public IList<ListenHandler> Listens { get; private set; }
//添加监听
public ServerOptions AddListen(int port)
{
return AddListen(null, port);
}
//添加监听
public ServerOptions AddListen(string host, int port)
{
ListenHandler listenOptions = new ListenHandler
{
Host = host,
Port = port
};
Listens.Add(listenOptions);
return this;
}
//添加SSL监听
public ServerOptions AddListenSSL(string certificateFile, string password, int port = 443)
{
return AddListenSSL(certificateFile, password, null, port);
}
//添加SSL监听
public ServerOptions AddListenSSL(string certificateFile, string password, string host, int port = 443)
{
ListenHandler listenOptions = new ListenHandler
{
Host = host,
Port = port,
SSL = true,
CertificateFile = certificateFile,
CertificatePassword = password
};
Listens.Add(listenOptions);
return this;
}
//配置最大可用缓冲区大小(单位MB)
public int BufferPoolMaxMemory { get; set; }
//单个缓冲池大小
public int BufferPoolSize { get; set; }
//配置有多少个缓冲池
public int BufferPoolGroups { get; set; }
//配置大小节序,默认是小
public bool LittleEndian{ get; set;}
//配置字符编码
public System.Text.Encoding Encoding{get; set;}
//是否启动单独的队列来进行接收消息处理
public bool IOQueueEnabled { get; set; }
//单个缓存大小
public int BufferSize { get; set; }
//最大支持在线会话数
public int MaxConnections { get; set; }
//接入队列最大数量,超过直接拒绝
public int MaxAcceptQueue { get; set; }
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/ServerOptions.cs)
以上是组件的基础配置信息,在这里需要进行一说明的是监听列表属性Listens,组件的设计是允许服务同时监听多个地址,每个地址都可以开启不同的TLS选项,具体会在后面的《监听》章节详细说明。
会话管理
作为服务端它需要同时应用对不同的请求处理,针对这些管理同样也非常重要。服务端一般提供会话管理功能有:会话创建,释放,发送消息,接收消息和简单查询功能。在上面描述的服务接口中有几个成员用于提供这些功能的支持。
基础属性方法
- Handler
设置服务相关处理事件对象,对象实现IServerHandler接口用于接管处理服务相关事件逻辑处理。
- Send
向指定的会话发送消息,并返回是否已经写入到相应的会话队列
bool Send(object message, ISession session);
bool[] Send(object message, params ISession[] sessions);
bool[] Send(object message, System.ArraySegment<ISession> sessions);
- Log
记录日志方法
void Log(EventArgs.LogType type, ISession session, string message);
void Log(EventArgs.LogType type, ISession session, string message, params object[] parameters);
大部分日志信息都需要用于字符串拼接,字符拼接对服务性能有比较大的影响,在记录日志的前最好判断当前日志级别。
if (EnableLog(LogType.Debug))
{
Log(LogType.Debug, session, "{0} update active time", session.RemoteEndPoint);
}
- GetOnlines
获取当前服务在线的所有会话
ISession[] GetOnlines();
- GetSession
获取指定ID的会话信息
ISession GetSession(long id);
- UpdateSession
更新会话最新活跃时间,它是配合Options.SessionTimeOut来使用,当SessionTimeOut设置了值后服务会定时检测会话的最新活跃时间,如果超出配置的值则会关闭会话。
void UpdateSession(ISession session);
- Version
当前服务在线会话变更版本号,由于服务存储会话用于Dictionary,如果每次获取列表都从新获取会比较损耗资源;服务引入了一个版本号只要缓存中的版本号一致即直接从已有缓存列表中获取所有会话。
public ISession[] GetOnlines()
{
if (mOnlines.Version != this.Version)
{
//元子状态操作判断
if (System.Threading.Interlocked.CompareExchange(ref mGetOnlinesStatus, 1, 0) == 0)
{
try
{
if (mOnlines.Version != this.Version)
{
mOnlines.Arrays = mSessions.Values.ToArray();
mOnlines.Version = this.Version;
}
}
finally
{
System.Threading.Interlocked.Exchange(ref mGetOnlinesStatus, 0);
}
}
}
return mOnlines.Arrays;
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/TcpServer.cs)
会话事件
组件通过IServerHandler接口用于描述针对会话的一些事件行为处理,通过实现这接口可以定义会话创建,释放和消息接等环节的逻辑处理。
public interface IServerHandler
{
//对应的服务信息
IServer Server { get; set; }
//会话创建前事件,可以根据情况取消会话
void Connecting(IServer server, EventArgs.ConnectingEventArgs e);
//会话创建完成事件
void Connected(IServer server, EventArgs.ConnectedEventArgs e);
//记录日志方法
void Log(IServer server, EventArgs.ServerLogEventArgs e);
//记录错误日志方法
void Error(IServer server, EventArgs.ServerErrorEventArgs e);
//会话接收网络流事件
void SessionReceive(IServer server, SessionReceiveEventArgs e);
//会话接收消息,前提是服务定义了IPacket协议分析器
void SessionPacketDecodeCompleted(IServer server, EventArgs.PacketDecodeCompletedEventArgs e);
//会话断开事件
void Disconnect(IServer server, EventArgs.SessionEventArgs e);
//检测会话超时事件,只有设置了SessionTimeOut才会触发
void SessionDetection(IServer server, SessionDetectionEventArgs e);
//初始化服务完成事件
void Opened(IServer server);
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/IServerHandler.cs)
以上接口行为必须由使用者实现并设置到对应的服务中,服务会根据情况进行调用。在这里需要关注的是SessionReceive和SessionPacketDecodeCompleted,前者是会话接收到数据时候触发处理,而后者则是协议分析后转换成对象触处理;当服务配置了协议分析器后IPacket后才会触,一旦配置了IPacket那对应的SessionReceive则无效。
日志
网络服务涉及到很多异步或多线程处理的环节,这些应用在问题排查上非常困难;还有就是在高并发吞吐在处理日志上也要有所注意,处理不当还是对性能影响比较大的。服务提供了Log和Error方法来记录日志,为了在不同情况下对性能和输出带来的影响还制定了LogLevel用于定义不同情况的输出。
LogLevel定义
public enum LogType : int
{
All = 0,
Trace = 1,
Debug = 2,
Info = 4,
Warring = 8,
Error = 16,
Fatal = 32,
Off = 64
}
对于值的定义为什么是2的次方呢,主要是方便合混定义合用如:Trace|Info|Error,以上定义就是可以同时输出以上几种日志内容。
日志输出
组件在很多过程处理都添加了日志记录,但记录日志涉及到一个问题就是当吞吐比较大的情况记录这些日志都会带来极大的损耗问题;所以在记录日志的时候时还是有所不同,代码如下:
if (Server.EnableLog(EventArgs.LogType.Warring))
{
Server.Log(EventArgs.LogType.Warring, null,
$"{RemoteEndPoint} sync SslStream date error {error.Message}@{error.StackTrace}");
}
为什么每次记录日志都需要判断一下呢,统一在下层过虑不更方便?主要原因是日志信息过程存在大量合并字符串,而这个过程又非常损耗资源;如果上层不判断下层过虑那就意味着不过日志记不记录都存在合并字符串的损耗,而这种损耗一旦在高吞吐的情况下会给GC带的了巨大的压力(毕竟在日志全开高压下可以达到每秒数十万条或更多)!
创建服务
由于服务是基于接口的方式提供使用,使用者无法直接创建;需要通过SocketFactory对象相关静态方法来创实服务实例。
public class SocketFactory
{
//创建一个TCP服务并设置处理的事件和协议分析器
public static IServer CreateTcpServer(IServerHandler handler, IPacket packet, ServerOptions options = null)
{
TcpServer server = new TcpServer(options);
server.Handler = handler;
server.Packet = packet;
return server;
}
//创建一个TCP服务并设置处理的事件和协议分析器
public static IServer CreateTcpServer<HANDLER, IPACKET>(ServerOptions options = null)
where HANDLER : IServerHandler, new()
where IPACKET : IPacket, new()
{
return CreateTcpServer(new HANDLER(), new IPACKET(), options);
}
//创建一个TCP服务并设置处理的事件,无协议分析器
public static IServer CreateTcpServer<HANDLER>(ServerOptions options = null) where HANDLER : IServerHandler, new()
{
return CreateTcpServer(new HANDLER(), null, options);
}
//创建一个指向服务地址的Client,host可以是域名
public static CLIENT CreateClient<CLIENT>(string host, int port)
where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, null);
return client;
}
//创建一个指向服务地址的Client并指定协议分析器,host可以是域名
public static CLIENT CreateClient<CLIENT, PACKET>(string host, int port)
where PACKET : Clients.IClientPacket, new()
where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, new PACKET());
return client;
}
//创建一个指向服务地址的Client并指定协议分析器,host可以是域名
public static CLIENT CreateClient<CLIENT>(IClientPacket packet, string host, int port) where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, packet);
return client;
}
//创建一个指向服务地址的Ssl Client,host可以是域名,serviceName为证书的服务名称
public static CLIENT CreateSslClient<CLIENT>(string host, int port, string serviceName)
where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, null);
client.SSL = true;
client.SslServiceName = serviceName;
return client;
}
//创建一个指向服务地址的Ssl Client并指定协议分析器,host可以是域名,serviceName为证书的服务名称
public static CLIENT CreateSslClient<CLIENT, PACKET>(string host, int port, string serviceName)
where PACKET : Clients.IClientPacket, new()
where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, new PACKET());
client.SSL = true;
client.SslServiceName = serviceName;
return client;
}
//创建一个指向服务地址的Ssl Client并指定协议分析器,host可以是域名,serviceName为证书的服务名称
public static CLIENT CreateSslClient<CLIENT>(IClientPacket packet, string host, int port, string serviceName) where CLIENT : IClient, new()
{
CLIENT client = new CLIENT();
client.Init(host, port, packet);
client.SSL = true;
client.SslServiceName = serviceName;
return client;
}
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/ServerFactory.cs)
以上是服务最基础的功能,实际还有很多细节的东西需要实现,如连接监听,接收,发送信息等;这些都会在后面的章节来讲述如何实现。
启动
IServer提供了一个Open方法来启动服务,这个方法的过程主要是对配置进行一个初始化,和调用相关的ListenHandler(《监听》章节会讲述)。
public bool Open()
{
bool result = false;
try
{
//初始化
ToInitialize();
Status = ServerStatus.Start;
//运行相关地址监听
foreach (ListenHandler item in this.Options.Listens)
{
item.SyncAccept = Options.SyncAccept;
item.Run(this, OnListenAcceptCallBack);
}
//打印当前GC是否ServerGC,在多核发CPU中开启这个配置可以更好地提升GC处理能力
if (!GCSettings.IsServerGC)
{
if (EnableLog(LogType.Warring))
Log(LogType.Warring, null, "no serverGC mode,please enable serverGC mode!");
}
if (Handler != null)
Handler.Server = this;
if (WriteLogo != null)
WriteLogo();
else
OnWriteLogo();
Handler?.Opened(this);
//获取监听是所有成功
foreach (ListenHandler item in this.Options.Listens)
{
result |= (item.Error == null);
}
}
catch (Exception e_)
{
Status = ServerStatus.Error;
if (EnableLog(LogType.Error))
Error(e_, null, "server start error!");
result = false;
}
if (result)
{
Status = ServerStatus.Start;
}
else
{
Status = ServerStatus.Error;
}
return result;
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/TcpServer.cs)
在启动过程中有个比较重要的环节是内部初始化,这个工作主要把服务的缓冲区和线程队列等对象过行实例化。
private void ToInitialize()
{
if (!mInitialized)
{
//初始化接收线程队列
mReceiveDispatchCenter = new Dispatchs.DispatchCenter<SocketAsyncEventArgsX>(ProcessReceiveArgs, Options.IOQueues);
int maxBufferSize;
if (Options.BufferPoolMaxMemory == 0)
{
Options.BufferPoolMaxMemory = 500;
}
//计算最大可用缓冲区内存
maxBufferSize = (int)(((long)Options.BufferPoolMaxMemory * 1024 * 1024) / Options.BufferSize / Options.BufferPoolGroups);
if (maxBufferSize < Options.BufferPoolSize)
maxBufferSize = Options.BufferPoolSize;
//初始化接收缓冲池
mReceiveBufferPoolGroup = new BufferPoolGroup(Options.BufferSize, Options.BufferPoolSize, maxBufferSize, Options.BufferPoolGroups);
//初始化发送缓冲池
mSendBufferPoolGroup = new BufferPoolGroup(Options.BufferSize, Options.BufferPoolSize, maxBufferSize, Options.BufferPoolGroups);
//创建会话列表
mSessions = new ConcurrentDictionary<long, ISession>();
mInitialized = true;
//初始化接入处理线程队列
mAcceptDispatcher = new Dispatchs.DispatchCenter<AcceptSocketInfo>(AcceptProcess, Math.Min(Environment.ProcessorCount, 16));
//配置会话超时
if (Options.SessionTimeOut > 0)
{
//计算设置超时时间
if (Options.SessionTimeOut * 1000 < mTimeOutCheckTime)
mTimeOutCheckTime = Options.SessionTimeOut * 1000;
else
mTimeOutCheckTime = Options.SessionTimeOut * 1000;
if (mDetectionTimer != null)
mDetectionTimer.Dispose();
//启动定时器
mDetectionTimer = new System.Threading.Timer(OnDetectionHandler, null,
mTimeOutCheckTime, mTimeOutCheckTime);
if (EnableLog(LogType.Info))
Log(LogType.Info, null, "detection sessions timeout with {0}s", Options.SessionTimeOut);
}
}
}
(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/TcpServer.cs)