优秀图网站,中国市场营销培训网,wordpress 搜索设置,助君网络科技目录 前言介绍目的IO线程初始化IO线程Proactor启动Procator线程轮询处理socketIOObject总结前言 介绍 [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤#xff08;订阅#xf… 目录 前言介绍目的IO线程初始化IO线程Proactor启动Procator线程轮询处理socketIOObject总结 前言 介绍 [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤订阅,对多种传输协议的无缝访问。
当前有2个版本正在维护版本3最新版为3.3.4版本4最新版本为4.0.0-rc5。本文档是对4.0.0-rc5分支代码进行分析。 zeromq的英文文档NetMQ的英文文档 目的 对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下 消息队列NetMQ 原理分析1-Context和ZObject消息队列NetMQ 原理分析2-IO线程和完成端口消息队列NetMQ 原理分析3-命令产生/处理和回收线程消息队列NetMQ 原理分析4-Session和Pipe消息队列NetMQ 原理分析5-Engine消息队列NetMQ 原理分析6-TCP和Inpoc实现消息队列NetMQ 原理分析7-Device消息队列NetMQ 原理分析8-不同类型的Socket消息队列NetMQ 原理分析9-实战 友情提示: 看本系列文章时最好获取源码,更有助于理解。 IO线程 NetMQ 4.0.0底层使用的是IOCP(即完成端口)模式进行通信的(3.3.4使用的是select模型),通过异步IO绑定到完成端口,来最大限度的提高性能。这里不对同步/异步socket进行详细介绍。稍微解释下完成端口,为了解决每个socket客户端使用一个线程进行通信的性能问题完成端口它充分利用内核对象的调度只使用少量的几个线程来处理和客户端的所有通信消除了无谓的线程上下文切换最大限度的提高了网络通信的性能。 想详细了解完成端口的请看完成端口(Completion Port)详解,讲解的比较详细,同时对各种网络编程模型做了简单的介绍。 因此NetMQ通过几个(默认1个)IO线程处理通信,上一片文章介绍了ZObejct对象,在该对象中存在许多命令的处理,实际对命令的发送,分配都是IO线程的工作。 初始化IO线程 IO线程初始化时会初始化Proactor和IOThreadMailbox var name iothread- threadId;
m_proactor new Proactor(name);
m_mailbox new IOThreadMailbox(name, m_proactor, this); Proactor对象就是用来绑定或处理完成端口用的,后面再做作详细介绍。IOThreadMailbox是IO线程处理的信箱,每当有命令需要处理时,都会向当前Socket对象所在的IO线程信箱发送命令。 让我们看一眼IOThread对象和IOThreadMailbox的定义 internal sealed class IOThread : ZObject, IMailboxEvent
{
} IOThread对象继承自ZObject对象,记得上一节想到ZObject对象知道如何处理各种命令吗因此IOThread对象也继承了他父亲的技能。同时IOThread对象实现了IMailboxEvent接口,这个接口之定义了一个方法。 internal interface IMailboxEvent
{void Ready();
} 当IO信箱接受到命令时表示当前有命令准备好了,可以进行 处理,IO信箱则会调用IO线程的Ready方法处理命令,那么IO信息如何调用IO线程的Ready方法呢来看下IOThreadMailbox的构造函数。 internal class IOThreadMailbox : IMailbox
{...public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent){m_proactor proactor;m_mailboxEvent mailboxEvent;Command cmd;bool ok m_commandPipe.TryRead(out cmd);}...
} 在IOThreadMailbox初始化时,传入了IMailboxEvent。 m_commandPipe是NetMQ的管道(Pipe),后面我们会对其做介绍,这里只要知道该管道用于存放命令即可,可以__暂时__理解为管道队列。 Proactor 每个IOThread会有一个Proactor,Proactor的工作就是将Socket对象绑定到完成端口,然后定时去扫描完成端口是否有需要处理的Socket对象。 internal class Proactor : PollerBase
{...public Proactor([NotNull] string name){m_name name;m_stopping false;m_stopped false;m_completionPort CompletionPort.Create();m_sockets new DictionaryAsyncSocket, Item();}...
} Proactor对象继承自PollerBase,那么PollerBase又是什么呢?从命名可以看这是一个轮询基类,即该对象需要长时间不断循环处理某件事情。PollerBase对象是一个抽象类,它有2个功能 负载均衡 还记的Context中选择IO线程时有这个一段代码吗? IO线程的负载均衡功能就是PollBase对象提供的 每次选择IO线程时会将m_load字段值1protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } } 在IOThread取PollBase对象(Proactor)的Load属性时候会特殊处理保证拿到的是最新的值。定时任务 PollBase第二个功能就是支持定时任务即定时触发某事件。 private readonly SortedListlong, ListTimerInfo m_timers; PollBase内部有一个SortedList,key为任务执行的时间,value为TimeInfo。TimeInfo对象包含2个信息id和ITimerEvent接口,id用来辨别当前任务的类型,ITimerEvent接口就包含了TimerEvent方法,即如何执行。 如TcpConnection连接失败会重新连接时会重连,下面时TcpConnection开始连接方法 private void StartConnecting()
{Debug.Assert(m_s null);// Create the socket.try{m_s AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);}catch (SocketException){AddReconnectTimer();return;}...
}
private void AddReconnectTimer()
{//获取重连时间间隔int rcIvl GetNewReconnectIvl();//IO线程的Proactor中,TcpConnection的ReconnectTimerId 1 m_ioObject.AddTimer(rcIvl, ReconnectTimerId);...
} IO线程会被封装到IOObject中,调用IOObject的AddTimer方法实际就是调用IO线程中Proactor对象的AddTimer方法,其方法定义如下 public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
{long expiration Clock.NowMs() timeout;var info new TimerInfo(sink, id);if (!m_timers.ContainsKey(expiration))m_timers.Add(expiration, new ListTimerInfo());m_timers[expiration].Add(info);
} 第一行会获取当前的毫秒时间加上时间间隔。然后加入到m_timers中。m_completionPort CompletionPort.Create();
m_sockets new DictionaryAsyncSocket, Item(); 初始化时会创建完成端口当有socket需要处理时会和完成端口绑定。 初始化时还会初始化一个存放异步AsyncSocket和item的字典。 有关于AsyncSocket和CompletionPort可以去Git上看AsyncIO的源码,这里不做分析。Item结构如下 private class Item
{public Item([NotNull] IProactorEvents proactorEvents){ProactorEvents proactorEvents;Cancelled false;}[NotNull] public IProactorEvents ProactorEvents { get; }public bool Cancelled { get; set; }
} 它包含了IProactorEvents接口的信息和当前Socket操作是否被取消标志。 internal interface IProactorEvents : ITimerEvent
{void InCompleted(SocketError socketError, int bytesTransferred);void OutCompleted(SocketError socketError, int bytesTransferred);
} IProactorEvents继承自ITimerEvent。同时它还声明了InCompleted和OutCompleted方法即发送或接收完成时如何处理因此当需要处理Socket时会将当前Socket处理方式保存到这个字典中。当当前对象发送消息完成,则会调用OutCompleted方法,接收完成时则会调用InCompleted方法。 当有Socket需要绑定时会调用Proactor的AddSocket方法 public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
{var item new Item(proactorEvents);m_sockets.Add(socket, item);m_completionPort.AssociateSocket(socket, item);AdjustLoad(1);
} 它包含2个参数,一个时异步Socket对象和IProactorEvents。然后加把他们加入到字段中并将他们绑定到完成端口上。第四段AdjustLoad方法即把当前IO线程处理数量1用于负载均衡用。 当Socket操作完成时会调用Proactor的RemoveSocket移除绑定 public void RemoveSocket(AsyncSocket socket)
{AdjustLoad(-1);var item m_sockets[socket];m_sockets.Remove(socket);item.Cancelled true;
} 移除时会将item的Cancelled字段设置为true。所以当Proactor轮询处理Socket时发现该Socket操作被取消(移除),就会跳过处理。 启动Procator线程轮询 在IO线程启动时实际就是启动Procator的work线程 public void Start()
{m_proactor.Start();
} public void Start()
{m_worker new Thread(Loop) { IsBackground true, Name m_name };m_worker.Start();
} 处理socket 完整的Loop方法如下 private void Loop()
{var completionStatuses new CompletionStatus[CompletionStatusArraySize];while (!m_stopping){// Execute any due timers.int timeout ExecuteTimers();int removed;if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout ! 0 ? timeout : -1, completionStatuses, out removed))continue;for (int i 0; i removed; i){try{if (completionStatuses[i].OperationType OperationType.Signal){var mailbox (IOThreadMailbox)completionStatuses[i].State;mailbox.RaiseEvent();}// if the state is null we just ignore the completion statuselse if (completionStatuses[i].State ! null){var item (Item)completionStatuses[i].State;if (!item.Cancelled){switch (completionStatuses[i].OperationType){case OperationType.Accept:case OperationType.Receive:item.ProactorEvents.InCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;case OperationType.Connect:case OperationType.Disconnect:case OperationType.Send:item.ProactorEvents.OutCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;default:throw new ArgumentOutOfRangeException();}}}}catch (TerminatingException){ }}}
} var completionStatuses new CompletionStatus[CompletionStatusArraySize]; 第一行初始化了CompletionStatus数组,CompletionStatusArraySize值为100。CompletionStatus作用是用来保存socket的信息或状态。 获取超时时间 int timeout ExecuteTimers(); protected int ExecuteTimers()
{if (m_timers.Count 0)return 0;long current Clock.NowMs();var keys m_timers.Keys;for (int i 0; i keys.Count; i){var key keys[i];if (key current){return (int)(key - current);}var timers m_timers[key];foreach (var timer in timers){timer.Sink.TimerEvent(timer.Id);}timers.Clear();m_timers.Remove(key);i--;}return 0;
} ExecuteTimers会计算之前加入到m_timers需要等待的超时时间,若没有对象则直接返回0,否则获取若获取到key时间在当前时间之前,则需要调用TimerEvent方法调用完成后移除。 若获取到的key时间比当前时间大,则返回他们的差即为需要等待的超时时间。 从完成端口获取处理完的状态 int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout ! 0 ? timeout : -1, completionStatuses, out removed))continue; GetMultipleQueuedCompletionStatus方法传入一个超时时间,若前面获取的超时时间为0,则这边会设置为-1,表示阻断直到有要处理的才返回。CompletionPort内部维护了一个状态队列removed即为处理完成返回的状态个数。 若获取成功则会返回true,后面就开始遍历completionStatuses数组处理完成Socket。 开始处理待处理的状态 public struct CompletionStatus
{internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this(){AsyncSocket asyncSocket;State state;OperationType operationType;SocketError socketError;BytesTransferred bytesTransferred;}public AsyncSocket AsyncSocket { get; private set; }public object State { get; internal set; }public OperationType OperationType { get; internal set; }public SocketError SocketError { get; internal set; }public int BytesTransferred { get; internal set; }
} CompletionStatus是个结构体,它包含的信息如上。其中OperationType是当前Socket的处理方式。 public enum OperationType
{Send, Receive, Accept, Connect, Disconnect, Signal
} 在for循环的一开始先会判断当前状态的OperationType,若是Signal,则说明当前是个信号状态,说明有命令需要处理,则会调用IO信箱的RaiseEvent方法,实际为IO线程的Ready方法。 public void Ready()
{Command command;while (m_mailbox.TryRecv(out command))command.Destination.ProcessCommand(command);
} IOThread会将当前信箱的所有命令进行处理。 若不是Signal则会将CompletionStatus保存的状态信息转换为Item对象,并判断当前Socket是否移除(取消)。若没有则对其进行处理。判断OperationType,若为Accept或Receive则表示需要接收,则调用InCompleted方法。若为Connect,Disconnect或Send则表示有消息向外发送,则调用OutCompleted方法。 至此IOThread代码分析完毕。 IOObject internal class IOObject : IProactorEvents
{public IOObject([CanBeNull] IOThread ioThread){if (ioThread ! null)Plug(ioThread);}public void Plug([NotNull] IOThread ioThread){Debug.Assert(ioThread ! null);m_ioThread ioThread;}
} IOObject实际就是保存了IOThread的信息和Socket处理完成时如何执行,以及向外暴露了一些接口。 再次说明,如果向简单了解完成端口如何使用,则看《完成端口使用》如果想详细了解完成端口则看下《完成端口详细介绍》,如果想直到NetMQ的AsyncIO和完成端口的源码请看AsyncIO。 总结 该篇介绍了IO线程和完成端口的处理方式,若哪里分析的不到位或有误希望支出。 本文地址https://www.cnblogs.com/Jack-Blog/p/6347163.html 作者博客杰哥很忙 欢迎转载请在明显位置给出出处及链接) 转载于:https://www.cnblogs.com/Jack-Blog/p/6347163.html