中国工程建设信息网站,工商企业信息公示系统,网站开发过程前端后端,国家中小企业公共服务平台发布者确认#xff08;Publisher Confirms#xff09;
发布者确认是一个 RabbitMQ 扩展#xff0c;用于实现可靠的发布。当在通道上启用发布者确认时#xff0c;客户端发布的消息将由代理异步确认#xff0c;这意味着它们已在服务器端得到处理。
0、引言 先决条件 本教程…发布者确认Publisher Confirms
发布者确认是一个 RabbitMQ 扩展用于实现可靠的发布。当在通道上启用发布者确认时客户端发布的消息将由代理异步确认这意味着它们已在服务器端得到处理。
0、引言 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口5672上运行。如果您使用了不同的主机、端口或凭证则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在本教程中我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略并解释它们的利弊。 原文链接https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html 1、在通道上启用发布者确认
发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展所以默认情况下它们是不启用的。使用 ConfirmSelect 方法可以在通道层级启用发布者确认
var channel connection.CreateModel();
channel.ConfirmSelect();您必须在期望启用发布者确认的每个通道上调用该方法。确认只需要启用一次而不是对每条发布的消息都启用。
策略 #1单独发布消息
让我们从实现带确认的发布的最简单途径开始吧那就是发布一条消息并同步等待它确认
while (ThereAreMessagesToPublish())
{byte[] body ...;IBasicProperties properties ...;channel.BasicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}在前面的示例中我们像往常一样发布消息并使用 Channel#WaitForConfirmsOrDie(TimeSpan) 方法等待它确认。该方法在消息确认后立即返回。如果在超时时间内消息未得到确认或者如果消息已 nack(Negative-Acknowledgement) 了意味着代理由于某些原因无法处理它方法会抛出一个异常。异常的处理通常包括记录一个错误消息日志 并/或 重新尝试发送消息。
不同的客户端库有不同的方式去同步处理发布者确认所以确保仔细阅读您正在使用的客户端的文档。
这个技术非常简单但也有一个巨大的缺点它会显著降低发布速度因为某条消息的确认会堵塞后续消息的发布。这种方法提供的吞吐量不会超过每秒几百条已发布的消息。不过这对于某些应用程序来说已经足够好了。 发布者确认是异步的吗 在开头我们提到代理是异步确认已发布的消息的但在第一个例子中代码是同步等待直至消息确认的。客户端实际上异步接收确认并相应地解除对 WaitForConfirmsOrDie 的调用阻塞。可以将 WaitForConfirmsOrDie 看作是一个同步 helper它依赖于底层的异步通知。 策略 #2批量发布消息
为了改进上面的例子我们可以发布一批消息并等待这一批消息全部得到确认。如下是一个使用 100 一批次的示例
var batchSize 100;
var outstandingMessageCount 0;
while (ThereAreMessagesToPublish())
{byte[] body ...;IBasicProperties properties ...;channel.BasicPublish(exchange, queue, properties, body);outstandingMessageCount;if (outstandingMessageCount batchSize){channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));outstandingMessageCount 0;}
}
if (outstandingMessageCount 0)
{channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}等待一批消息的确认比等待单个消息的确认大大提高了吞吐量在远程 RabbitMQ 节点上最多可提高 20-30 倍。一个缺点是如果出现故障我们不知道究竟是哪里出了问题因此我们可能不得不在内存中保存整个批处理以记录一些有意义的内容或重新发布消息。这个解决方案仍然是同步的因此它阻止消息的发布。
策略 #3异步处理发布者确认
代理异步确认已发布的消息只需要在客户端上注册一个回调就可以收到这些确认的通知
var channel connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks (sender, ea)
{// code when message is confirmed
};
channel.BasicNacks (sender, ea)
{//code when message is nack-ed
};这儿有两个回调一个用于已确认的消息一个用于已 nack 的消息可以认为是代理丢失的消息。两个回调都有相应的 EventArgs 参数ea包含 delivery tag 标识已确认或已 nack 消息的序列号。我们将很快看到如何将其与发布的消息关联起来。 multiple 这是一个布尔值。如果为 false则仅有一条消息确认/nack-ed如果为 true所有序列号 小于等于该序列号的消息都确认/nack-ed。 在发布前可以通过 Channel#NextPublishSeqNo 获得消息的序列号
var sequenceNumber channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);将消息与序列号关联起来的一种简单方法是使用字典。让我们假设我们想要发送字符串因为它们很容易转换为用于发布的字节数组。下面是一个代码示例它使用字典将发布序列号与字符串消息体关联起来
var outstandingConfirms new ConcurrentDictionaryulong, string();
// ... code for confirm callbacks will come later
var body ...;
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));发布代码现在使用字典跟踪出站消息。我们需要在确认到达时清理字典并在消息已 nack 时做一些类似于记录警告的事情
var outstandingConfirms new ConcurrentDictionaryulong, string();void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{if (multiple){var confirmed outstandingConfirms.Where(k k.Key sequenceNumber);foreach (var entry in confirmed){outstandingConfirms.TryRemove(entry.Key, out _);}}else{outstandingConfirms.TryRemove(sequenceNumber, out _);}
}channel.BasicAcks (sender, ea) CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks (sender, ea)
{outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);Console.WriteLine($Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple});CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};// ... publishing code先前的示例中包含一个在确认到达时清理字典的回调。注意这个回调处理单次和多次确认。这个回调会在确认到达Channel#BasicAcks时被使用。用于已 nack 消息的回调将检索消息体并发出警告。然后它重用之前的回调来清除字典中未完成的确认无论消息是已确认还是已 nack都必须删除字典中对应的条目。 如何跟踪未完成的确认 我们的示例使用一个 ConcurrentDictionary 跟踪未完成的确认。由于几种原因这个数据结构十分方便。它允许我们能够轻易地将序列号与消息关联起来无论消息数据是什么并允许我们能够通过一个给出的序列 id 轻易地清理条目以处理多次确认/nack。最后它支持并发访问因为确认回调是在客户端库拥有的线程中被调用的该线程应该与发布线程保持不同。 除了使用复杂的字典实现外还有其他方法可以跟踪未完成的确认比如使用简单的并发哈希表和变量来跟踪发布序列的下界但它们通常更复杂不属于“教程”的范畴。 总而言之异步处理发布者确认通常需要以下步骤
提供一个方法去关联发布序列号和消息。在通道上注册确认侦听器以便在发布者 acks/nacks 到达时得到通知并执行适当的操作例如记录或者重新发布已 nack 的消息。在此步骤中序列号到消息的关联机制也可能需要进行一些清理。在发布消息之前跟踪发布序列号。 重新发布已 nack 的消息 在相应的回调中重新发布已 nack 的消息可能很诱人但应该避免这样因为确认回调是在通道不应该执行操作的I/O 线程中分配的。更好的方案是在内存队列中对消息进行排队该队列由发布线程轮询。像 ConcurrentQueue 这样的类可以很好地在确认回调和发布线程之间传递消息。 总结
在某些应用程序中确保已发布的消息到达代理是必要的。发布者确认Publisher Confirms是 RabbitMQ 的一个特性可以帮忙满足这个需求。发布者确认本质上是异步的但也可以同步处理它们。没有说只有一个绝对的方法来实现发布者确认这通常取决于应用程序和整个系统中的约束。典型的技术有
单独发布消息同步等待确认简单但吞吐量非常有限。批量发布消息同步等待批处理的确认简单、合理的吞吐量但在一些东西出现问题时很难推断。异步处理最佳性能和资源使用错误情况下的良好控制但还需要参与正确实现的过程无法直接用现成的。
2、将所有的东西放到一起
PublisherConfirms.cs 类包含了我们所介绍的技术的代码。我们可以编译它按原样执行它并看看每项技术的表现如何
dotnet run输出会看起来像下面这样
Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms运行效果
如果客户端和服务器位于同一台机器上那么您的计算机的输出应当与之类似。不出所料单独发布消息表现十分糟糕但出乎意料的是与批量发布相比异步处理的表现有些令人失望。
发布者确认十分依赖于网络所以我们最好不要在远端节点上尝试而在生产中客户端和服务器通常不在同一台机器上却又是更现实的情况。PublisherConfirms.cs 可以很容易地更改为使用非本地节点
private static IConnection CreateConnection()
{var factory new ConnectionFactory { HostName remote-host, UserName remote-host, Password remote-password };return factory.CreateConnection();
}重新编译类再次执行并等待结果
Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms我们看到单独发布现在的表现仍然非常糟糕。但是有了客户端和服务器之间的网络批量发布和异步处理现在表现得差不多同时异步处理在发布者确认方面还有一点小优势。
请记住批量发布很容易实现但是在 negative publisher acknowledgement 的情况下不容易知道哪些消息不能发送到代理。异步处理发布者确认需要更多的参与实现但提供了更好的粒度和对发布消息已 nack 时执行的操作的更好控制。
5、生产[非]适用性免责声明
请记住本教程和其他教程都是教程。他们一次展示一个新概念可能会有意地过度简化一些东西而忽略其他东西。例如为了简洁起见连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前请先查看其他文档。我们特别推荐以下指南发布者确认和消费者确认生产清单和监控。