在应用中Socket所表达只是通讯对象它不能反映更多的业务信息,由于职责问题也很少会在Socket加上更多的业务扩展功能,这时候需要针对连接进行一个会话封装;作为会话它不仅包括基础的Socket信息,更重要是满足其他信息存储和业务状态的需求,在行为上还需要扩展出针对消息传输和更方便的流处理能力。
以下是组件针对连接会话的定义:

  1. public interface ISession : IDisposable
  2. {
  3. //会话超时间,组件内部处理
  4. double TimeOut
  5. { get; set; }
  6. //会话来源于那个监听的host
  7. string Host { get; set; }
  8. //会话来源于那个监听的port
  9. int Port { get; set; }
  10. //会话最大等待消息数
  11. int MaxWaitMessages { get; set; }
  12. //会话使用的接收缓冲池
  13. Buffers.IBufferPool ReceiveBufferPool { get; set; }
  14. //会话使用的发送缓冲池
  15. Buffers.IBufferPool SendBufferPool { get; set; }
  16. //会话来源于那个监听对象
  17. ListenHandler ListenHandler { get; set; }
  18. //初始化会话方法
  19. void Initialization(IServer server, Action<ISession> setting);
  20. //描述数值写入流的字序
  21. bool LittleEndian
  22. {
  23. get; set;
  24. }
  25. //最后一次Socket操作状态
  26. SocketError LastSocketError { get; set; }
  27. //用于发送的异步对象
  28. Buffers.SocketAsyncEventArgsX SendEventArgs { get; set; }
  29. //用于接收的异步对象
  30. Buffers.SocketAsyncEventArgsX ReceiveEventArgs { get; set; }
  31. //Socket处理过程结果方法,可以自定义跟踪收发完成情况
  32. ISessionSocketProcessHandler SocketProcessHandler
  33. {
  34. get; set;
  35. }
  36. //会话ID,当前服务唯一
  37. long ID { get; }
  38. //自定义属性存储
  39. object this[string key] { get; set; }
  40. //发送消息队列数量
  41. int Count { get; }
  42. //当前会话关联的Socket对象
  43. System.Net.Sockets.Socket Socket
  44. { get; }
  45. //会话名称,可根据需求设置
  46. string Name { get; set; }
  47. //返回对应的服务对象
  48. IServer Server { get; }
  49. //当前会话理否已释放
  50. bool IsDisposed { get; }
  51. //导入接收的缓冲
  52. void Receive(Buffers.IBuffer buffer);
  53. //发送一个消息对象
  54. bool Send(object data);
  55. //会话对应的标签对象,根据情况设置
  56. object Tag { get; set; }
  57. //返回远程的地址
  58. System.Net.EndPoint RemoteEndPoint
  59. { get; }
  60. //返回当前会话的协议分析器
  61. IPacket Packet
  62. { get; }
  63. //返回当前会话的数据流,可读写非线程安全
  64. System.IO.Stream Stream
  65. {
  66. get;
  67. }
  68. //当前会话是否SSL
  69. bool SSL { get; }
  70. //当前会话安全状态
  71. AuthenticationType Authentication
  72. { get; set; }
  73. }

(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/ISession.cs)

基础信息

会话提供了一些基础的信息,主要包括:服务监听信息,使用怎样的协议分析器和流信息等。会话的大部分信息都是由组件内部所指定的,所以大部分信息只用于读取处理。

  • ID

这属性是会话中在服务中的唯一性,可以通过这个ID直接查询对应的会话对象。

  • Count

获取当前会话等待发送的消息数量

  • Authentication

这个属性主要用于定义会话状态,结合这个状态属性可以判断那些消息来源是否符合连接当前状态的要求。

  1. public enum AuthenticationType : int
  2. {
  3. None = 1,
  4. Connected = 2,
  5. User = 4,
  6. Admin = 8,
  7. Security = 16
  8. }

(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/TcpSession.cs)
这个属性在业务处理上非常有用,毕竟很多时候消息的处理都是有限制的;如没有登陆的情况不能处理数据更新消息,在这场景可以在登陆消息成功后设置一下这属性,后面就可以判断是否处理了。

信息存储

在扩展业务的时候往往需要在会话上存储很多信息,如当前连接的用户信息等。会话提供以下两个属性来给用户存储扩展的业务信息。

  • 索引器

可以通过索引器来添加和获取相关键值的信息,这些信息会在释放时自动清除。具体定义代码:

  1. public object this[string key]
  2. {
  3. get
  4. {
  5. object value = null;
  6. mProperties.TryGetValue(key, out value);
  7. return value;
  8. }
  9. set
  10. {
  11. mProperties[key] = value;
  12. }
  13. }

(https://github.com/IKende/BeetleX/blob/master/src/BeetleX/TcpSession.cs)

  • Tag属性

用户可以定义自己的会话信息存储到这属性上,这属性的访问就比索引器来得更方便。FastHttpApi是用这属性来存储对应会话的HttpToken信息。

消息发送

会话提供了一个Send方法来发送消息,这个方法的参数是Object类型;为什么设计成这样呢,主要原因是在会话上层屏蔽基于bytes原始数发送行为,这种行为并不友好和协议缓冲不可控的情况。在后面《数据发送》章节会详细描述组件为什么这样设计,这样设计的情况数据是如何转换等。

数据流

会话提供了一个基础的数据流Stream属性来进行网络数据的读写操作;这个流在不同情况下有所不同,默认情况下是PipeStream,如果开启了SSL的则对应是SslStreamX;不过通过ToPipeStream方法都可以转换成PipeStream进行操作。

  1. public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
  2. {
  3. base.SessionReceive(server, e);
  4. var pipestream = e.Stream.ToPipeStream();
  5. pipestream.ReadFree((int)pipestream.Length);
  6. pipestream.Write(mData, 0, mData.Length);
  7. e.Stream.Flush();
  8. }

可以直接在事件中通过ToPipeStream()方法把流对象转换成PipeStream,转换后就可以进行读写数据,最终通过Flush()方法直接把数据返回给客户端。关于这个数据流的设计和功能会在《数据流》章节详细讲述。

释放连接

会话提供Disposed方法来释放会话,使用者可以在任意地方调用;会话释放后会把相对应的Socket关资源也进行释放。
以下是释放方法的详细代码:

  1. public void Dispose()
  2. {
  3. lock (this)
  4. {
  5. if (!mIsDisposed)
  6. {
  7. mIsDisposed = true;
  8. OnDispose();
  9. }
  10. }
  11. }
  12. protected virtual void OnDispose()
  13. {
  14. try
  15. {
  16. object data = DequeueSendMessage();
  17. //回收在发送队列中的缓冲区对象
  18. while (data != null)
  19. {
  20. if (data is IBuffer buffer)
  21. {
  22. BeetleX.Buffers.Buffer.Free(buffer);
  23. }
  24. data = DequeueSendMessage();
  25. }
  26. SendEventArgs?.Dispose();
  27. SendEventArgs?.Clear();
  28. ReceiveEventArgs?.Dispose();
  29. ReceiveEventArgs?.Clear();
  30. mReceiveArgs.Server = null;
  31. mReceiveArgs.Session = null;
  32. mBaseNetStream.Dispose();
  33. if (mSslStream != null)
  34. mSslStream.Dispose();
  35. //释放Socket资源
  36. Server.CloseSession(this);
  37. ListenHandler = null;
  38. ReceiveDispatcher = null;
  39. if (Packet != null)
  40. Packet.Dispose();
  41. mProperties.Clear();
  42. }
  43. catch
  44. {
  45. }
  46. }