知名商城网站建设价格,网站建设与安全管理,中国空间站图片绘画,企业管理软件销售工程师前言提到WebSocket相信大家都听说过#xff0c;它的初衷是为了解决客户端浏览器与服务端进行双向通信#xff0c;是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询#xff0c;来实现服务端的变更响应到客户端…前言 提到WebSocket相信大家都听说过它的初衷是为了解决客户端浏览器与服务端进行双向通信是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询来实现服务端的变更响应到客户端现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是ws或wss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了只是之前一直没有系统的整理出来本篇文章就来和大家分享一下由于主要是提供一种思路所以涉及到具体细节或者业务相关的可能没有体现出来还望大家理解。实现咱们的重点关键字就是两个WebSocket和集群实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github[1],具体仓库地址可以转到文末自行跳转到示例源码[2]中查看。既然涉及到集群这里咱们就用nginx作为反向代理来搭建一个集群实例。大致的示例结构如下图所示redis在这里扮演的角色呢是用来处理Server端的消息相互传递用的主要是使用的redis的pub/sub功能来实现的这里便涉及到几个核心问题• 首先集群状态每个用户被分发到具体的哪台服务器上是不得而知的• 其次处在不同Server端的不同用户间的相互通信是需要一个传递媒介• 最后针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略这里需要考虑的是如果需要搭建实时通信服务器的话需要注意集群的隔离性主要是和核心业务进行隔离毕竟WebSocket需要保持长链接、且消息的大小需要评估。上面提到了redis的主要功能就是用来传递消息用的毕竟每个server服务器是无状态的。这当然不是必须的任何可以进行消息分发的中间件都可以比如消息队列rabbitmq、kafka、rocketmq、mqtt等甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活让大家关注点在思路上而不是使用中案件的代码上。nginx配置通过上面的图我们可以看到我们这里构建集群示例使用的nginx如果让nginx支持WebSocket的话需要额外的配置这个在网上有很多相关的文章介绍这里就来列一下咱们示例的nginx配置在配置文件nginx.conf里//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {server 127.0.0.1:5001;server 127.0.0.1:5678;
}server {listen 5000;server_name localhost;location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//记得转发避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升级成websocket协议的头标识proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection Upgrade;}
}这套配置呢在搜索引擎上能收到很多不过不妨碍我把使用的粘贴出来。这一套亲测有效也是我使用的配置请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式保证同一个ip的客户端用户可以分发到一台WebSocket实例中去这样的话能尽量避免使用redis的用户频道做消息传递。好了接下来准备开始展示具体实现的代码了。一对一发送首先介绍的就是一对一发送的情况也就是我把消息发给你聊天的时候私聊的情况。这里呢涉及到两种情况• 如果你需要通信的客户端和你连接在一个Server端里这样的话可以直接在链接里找到这个端的通信实例直接发送。• 如果你需要通信的客户端和你不在一个Server端里这个时候咱们就需要借助redis的pub/sub的功能把消息传递给另一个Server端。咱们通过一张图大致的展示一下它的工作方式解释一下每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁不在当前服务器的话则发送到redis的user:用户唯一标识频道这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项这里我使用的redis客户端是freeredis主要是因为操作起来简单,具体实现代码如下var builder WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider {var logger provider.GetServiceILoggerWebSocketChannelHandler();RedisClient cli new RedisClient(127.0.0.1:6379);cli.Notice (s, e) logger?.LogInformation(e.Log);return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingletonWebSocketHandler();
builder.Services.AddControllers();var app builder.Build();var webSocketOptions new WebSocketOptions
{KeepAliveInterval TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);app.MapGet(/, () Hello World!);
app.MapControllers();app.Run();接下来我们定义一个Controller用来处理WebSocket请求public class WebSocketController : ControllerBase
{private readonly ILoggerWebSocketController _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILoggerWebSocketController logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger logger;_socketHandler socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet(/chat/user/{id})]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join);var webSocket await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode StatusCodes.Status400BadRequest;}}
}这里的WebSocketHandler是用来处理具体逻辑用的咱们看一下相关代码public class WebSocketHandler:IDisposable
{//存储当前服务用户的集合private readonly UserConnection UserConnection new();//redis频道前缀private readonly string userPrefix user:;//用户对应的redis频道private readonly ConcurrentDictionarystring, IDisposable _disposables new();private readonly ILoggerWebSocketHandler _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILoggerWebSocketHandler logger, RedisClient redisClient){_logger logger;_redisClient redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg(${userPrefix}{id});var buffer new byte[1024 * 4];//接收发送过来的消息这个方法是阻塞的如果没收到消息则一直阻塞var receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);//循环接收消息while (webSocket.State WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(\0);//接收的到消息转换成实体MsgBody msgBody JsonConvert.DeserializeObjectMsgBody(msg);//发送到其他客户端的数据byte[] sendByte Encoding.UTF8.GetBytes($user {id} send:{msgBody.Msg});_logger.LogInformation($user {id} send:{msgBody.Msg});//判断目标客户端是否在当前当前服务如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State WebSocketState.Open){await targetSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务则发送给目标redis端的频道ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId id, ToId msgBody.Id, Msg msgBody.Msg };//目标的redis频道_redisClient.Publish(${userPrefix}{msgBody.Id}, JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove(${userPrefix}{id}, out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub _redisClient.Subscribe(channel, async (channel, data) {//接收过来当前频道数据说明发送端不在当前服务ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($user {msgBody.FromId} send:{msgBody.Msg});//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State WebSocketState.Open){await targetSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}这里涉及到几个辅助相关的类其中UserConnection类是存储注册到当前服务的连接MsgBody类用来接受客户端发送过来的消息ChannelMsgBody是用来发送redis频道的相关消息因为要把相关消息通过redis发布出去咱们列一下这几个类的相关代码//注册到当前服务的连接
public class UserConnection : IEnumerableKeyValuePairstring, WebSocket
{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionarystring, WebSocket _users new ConcurrentDictionarystring, WebSocket();//当前服务的用户数量public int Count _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumeratorKeyValuePairstring, WebSocket GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}
}//客户端消息
public class MsgBody
{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }
}//频道订阅消息
public class ChannelMsgBody
{//用户标识public string FromId { get; set; }//目标用户标识也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }
}这样的话关于一对一发送消息的相关逻辑就实现完成了启动两个Server端由于nginx默认的负载均衡策略是轮询所以注册两个用户的话会被分发到不同的服务里去用Postman连接三个连接唯一标识分别是1、2、3模拟一下消息发送效果如下,发送效果接收效果群组发送上面我们展示了一对一发送的情况接下来我们来看一下群组发送的情况。群组发送的话就是只要大家都加入一个群组只要客户端在群组里发送一条消息则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道集群中的其他WebSocket服务器通过这个redis频道接收群组消息通过一张图描述一下群组的实现方式相对于一对一要简单一点• 发送端可以不用考虑当前服务中的客户端连接一股脑的交给redis把消息发布出去• 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息获取组内的用户循环发送消息展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet(/chat/group/{groupId}/{userId})]
public async Task ChatGroup(string groupId, string userId)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join);var webSocket await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode StatusCodes.Status400BadRequest;}
}接下来看一下HandleGroup的相关逻辑还是在WebSocketHandler类中看一下代码实现public class WebSocketHandler:IDisposable
{private readonly UserConnection UserConnection new();private readonly GroupUser GroupUser new();private readonly SemaphoreSlim _lock new(1, 1);private readonly ConcurrentDictionarystring, IDisposable _disposables new();private readonly string groupPrefix group:;private readonly ILoggerWebSocketHandler _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILoggerWebSocketHandler logger, RedisClient redisClient){_logger logger;_redisClient redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count 1){//订阅redis频道await SubGroupMsg(${groupPrefix}{groupId});}_lock.Release();var buffer new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State WebSocketState.Open){try{string msg Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(\0);_logger.LogInformation($group 【{groupId}】 user 【{userId}】 send:{msg});//组装redis频道发布的消息目标为群组标识ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId userId, ToId groupId, Msg msg };//通过redis发布消息_redisClient.Publish(${groupPrefix}{groupId}, JsonConvert.SerializeObject(channelMsgBody));receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub _redisClient.Subscribe(channel, async (channel, data) {ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg});//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key msgBody.FromId){continue;}if (user.Value.State WebSocketState.Open){await user.Value.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}
}这里涉及到了GroupUser类是来存储群组和群组用户的对应关系的定义如下public class GroupUser
{//key为群组的唯一标识public ConcurrentDictionarystring, UserConnection Groups new ConcurrentDictionarystring, UserConnection();
}演示一下把两个用户添加到一个群组内然后发送接收消息的场景用户u1发送用户u2接收发送所有人发送给所有用户的逻辑比较简单不用考虑到用户限制只要用户连接到了WebSocket集群则都可以接收到这个消息大致工作方式如下图所示这个比较简单咱们直接看实现代码首先是定义一个地址用于发布消息//把用户注册进去
[HttpGet(/chat/all/{id})]
public async Task ChatAll(string id)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join);var webSocket await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode StatusCodes.Status400BadRequest;}
}具体的实现逻辑还是在HandleGroup类里是HandleAll方法看一下具体实现public class WebSocketHandler:IDisposable
{private readonly UserConnection AllConnection new();private readonly ConcurrentDictionarystring, IDisposable _disposables new();private readonly string all all;private readonly ILoggerWebSocketHandler _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILoggerWebSocketHandler logger, RedisClient redisClient){_logger logger;_redisClient redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count 1){await SubAllMsg(all);}_lock.Release();var buffer new byte[1024 * 4];//阻塞接收信息var receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);while (webSocket.State WebSocketState.Open){try{string msg Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(\0);_logger.LogInformation($user {id} send:{msg});//获取接收信息ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId id, Msg msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub _redisClient.Subscribe(channel, async (channel, data) {ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($user 【{msgBody.FromId}】 send all:{msgBody.Msg});//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){ //如果包含当前用户跳过if (user.Key msgBody.FromId){continue;}if (user.Value.State WebSocketState.Open){await user.Value.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}
}效果在这里就不展示了和群组的效果是类似的只是一个是部分用户一个是全部的用户。整合到一起上面我们分别展示了一对一、群组、所有人的场景但是实际使用的时候每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下一个用户只需要连接到WebSocket集群一次即可至于发送给谁加入什么群组接收全部消息等都是连接后通过一些标识区分的而不必每个类型的操作都注册一次就和微信和QQ一样我只要登录了即可至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果大致的思路是• 用户连接到WebSocket集群把用户和连接保存到当前WebSocket服务器的用户集合中去。• 一对一发送的时候只需要在具体的服务器中找到具体的客户端发送消息• 群组的时候先把当前用户标识加入群组集合即可接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息• 全员消息的时候直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息这样的话就保证了每个客户端用户在集群中只会绑定一个连接首先还是单独定义一个action用于让客户端用户连接上来具体实现代码如下所示public class WebSocketChannelController : ControllerBase
{private readonly ILoggerWebSocketController _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILoggerWebSocketController logger, WebSocketChannelHandler webSocketChannelHandler){_logger logger;_webSocketChannelHandler webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet(/chat/channel/{id})]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join);var webSocket await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode StatusCodes.Status400BadRequest;}}
}接下来看一下WebSocketChannelHandler类的HandleChannel方法实现用于处理不同的消息比如一对一、群组、全员消息等不同类型的消息public class WebSocketChannelHandler : IDisposable
{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection new();//用于存储群组和用户关系用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionarystring, HashSetstring GroupUser new ConcurrentDictionarystring, HashSetstring();private readonly SemaphoreSlim _lock new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionarystring, IDisposable _disposables new();//一对一redis频道前缀private readonly string userPrefix user:;//群组redis频道前缀private readonly string groupPrefix group:;//全员redis频道private readonly string all all;private readonly ILoggerWebSocketHandler _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILoggerWebSocketHandler logger, RedisClient redisClient){_logger logger;_redisClient redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg(${userPrefix}{id});if (UserConnection.Count 1){await SubAllMsg(all);}_lock.Release();var buffer new byte[1024 * 4];//接收客户端消息var receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);while (webSocket.State WebSocketState.Open){try{string msg Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(\0);//读取客户端消息ChannelData channelData JsonConvert.DeserializeObjectChannelData(msg);//判断消息类型switch (channelData.Method){//一对一case One:await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case UserGroup:await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case Group:await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult await webSocket.ReceiveAsync(new ArraySegmentbyte(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove(${userPrefix}{id}, out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}这里涉及到了ChannelData类是用于接收客户端消息的类模板具体定义如下public class ChannelData
{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }
}类中并不会包含当前用户信息因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出客户端用户连接到WebSocket实例之后先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道用于处理非当前实例注册客户端的一对一消息处理和全员消息处理然后等待接收客户端消息根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理它的工作流程入下图所示由代码和上面的流程图可知它根据不同的标识去处理不同类型的消息接下来我们可以看下每种消息类型的处理方式。一对一处理首先是一对一的消息处理情况看一下具体的处理逻辑首先是一对一发布消息private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult){MsgBody msgBody JsonConvert.DeserializeObjectMsgBody(JsonConvert.SerializeObject(msg));byte[] sendByte Encoding.UTF8.GetBytes($user {id} send:{msgBody.Msg});_logger.LogInformation($user {id} send:{msgBody.Msg});//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State WebSocketState.Open){await targetSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器则直接把消息发布到具体的用户频道去由具体用户去订阅ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId id, ToId msgBody.Id, Msg msgBody.Msg };_redisClient.Publish(${userPrefix}{msgBody.Id}, JsonConvert.SerializeObject(channelMsgBody));}
}接下来是用于处理订阅其他用户发送过来消息的逻辑这个和整合之前的逻辑是一致的在当前服务器中找到用户对应的连接发送消息private async Task SubMsg(string channel)
{var sub _redisClient.Subscribe(channel, async (channel, data) {ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($user {msgBody.FromId} send:{msgBody.Msg});if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State WebSocketState.Open){await targetSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);
}如果给某个用户发送消息则可以使用如下的消息格式{Method:One, MsgBody:{Id:2,Msg:Hello}}Method为One代表着是私聊一对一的情况消息体内Id为要发送给的具体用户标识和消息体。群组处理接下来看群组处理方式这个和之前的逻辑是有出入的首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息之前是一个用户对应多个连接整合了之后集群中每个用户只关联唯一的一个WebSocket连接首先看用户加入群组的逻辑private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{//获取群组信息var currentGroup GroupUser.GetOrAdd(group, new HashSetstring());lock (currentGroup){//把用户标识加入当前组_ currentGroup.Add(user);}//每个组的redis频道在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count 1){//订阅当前组消息await SubGroupMsg(${groupPrefix}{group});}string addMsg $user 【{user}】 add to group 【{group}】;byte[] sendByte Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组则通知其他群成员ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId user, ToId group, Msg addMsg };_redisClient.Publish(${groupPrefix}{group}, JsonConvert.SerializeObject(channelMsgBody));
}用户想要在群组内发消息则必须先加入到一个具体的群组内具体的加入群组的格式如下{Method:UserGroup, Group:g1}Method为UserGroup代表着用户加入群组的业务类型Group代表着你要加入的群组唯一标识。接下来就看下用户发送群组消息的逻辑了private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{//判断群组是否存在var hasValue GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte Encoding.UTF8.GetBytes($group【{groupId}】 not exists);await webSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte Encoding.UTF8.GetBytes($user 【{userId}】 not in 【{groupId}】);await webSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($group 【{groupId}】 user 【{userId}】 send:{msgBody});//发送群组消息ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId userId, ToId groupId, Msg msgBody.ToString() };_redisClient.Publish(${groupPrefix}{groupId}, JsonConvert.SerializeObject(channelMsgBody));
}加入群组之后则可以发送和接收群组内的消息了给群组发送消息的格式如下{Method:Group, Group:g1, MsgBody:Hi All}Method为Group代表着用户加入群组的业务类型Group则代表你要发送到具体的群组的唯一标识MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况也就是处理群组消息的逻辑private async Task SubGroupMsg(string channel)
{var sub _redisClient.Subscribe(channel, async (channel, data) {//接收群组订阅消息ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg});//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) targetSocket.State WebSocketState.Open){await targetSocket.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);
}全员消息处理全员消息处理相对来说思路比较简单因为当服务启动的时候就会监听redis的全员消息频道这样的话具体的实现也就只包含发送和接收全员消息了首先看一下全员消息发送的逻辑private async Task HandleAll(string id, object msgBody)
{_logger.LogInformation($user {id} send:{msgBody});//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody new ChannelMsgBody { FromId id, Msg msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}全员消息的发送数据格式如下所示{Method:All, MsgBody:Hello All}Method为All代表着全员消息类型MsgBody则代表着具体消息。接收消息出里同样很简单订阅redis全员消息频道然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息具体逻辑如下private async Task SubAllMsg(string channel)
{var sub _redisClient.Subscribe(channel, async (channel, data) {ChannelMsgBody msgBody JsonConvert.DeserializeObjectChannelMsgBody(data.ToString());byte[] sendByte Encoding.UTF8.GetBytes($user 【{msgBody.FromId}】 send all:{msgBody.Msg});//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息因为发送的时候可以通过具体的业务代码处理if (user.Key msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State WebSocketState.Open){await user.Value.SendAsync(new ArraySegmentbyte(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);
}示例源码由于篇幅有限没办法设计到全部的相关源码因此在这里贴出来github相关的地址方便大家查看和运行源码。相关的源码我这里实现了两个版本一个是基于asp.net core的版本一个是基于golang的版本。两份源码的实现思路是一致的所以这两份代码可以运行在一套集群示例里配置在一套nginx里并且连接到同一个redis实例里即可• asp.net core源码示例 WebsocketCluster[3]• golang源码示例 websocket-cluster[4]仓库里还涉及到本人闲暇之余开源的其他仓库由于本人能力有限难登大雅之堂就不做广告了有兴趣的同学可以自行浏览一下。总结 本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例由于思想是通用的所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍由于这些只是博主关于构思的实现可能有很多细节尚未体现到还希望大家多多理解。其核心思路总结一下• 首先是利用可以构建WebSocket服务的框架在当前服务实例中保存当前客户端用户和WebSocket的连接关系• 如果消息的目标客户端不在当前服务器可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息由目标服务器获取目标是否属于自己的ws会话• 本文设计的思路使用的是无状态的方式即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储当然也可以利用redis等存储在线用户信息等这个可以参考具体业务自行设计读万卷书,行万里路。在这个时刻都在变化点的环境里唯有不断的进化自己多接触多尝试不用的事物多扩展自己的认知思维方能构建自己的底层逻辑。毕竟越底层越抽象越通用越抽象。面对未知的挑战自身作为自己坚强的后盾可能才会让自己更踏实。引用链接[1] 我的github: https://github.com/softlgl[2] 示例源码: #示例源码[3] WebsocketCluster: https://github.com/softlgl/WebsocketCluster[4] websocket-cluster: https://github.com/softlgl/websocket-cluster