期货融网站建设,亚马逊网站建设历程,wordpress后台教程网,wordpress表格线1、work消息模型
工作队列或者竞争消费者模式
在第一篇教程中#xff0c;我们编写了一个程序#xff0c;从一个命名队列中发送并接受消息。在这里#xff0c;我们将创建一个工作队列#xff0c;在多个工作者之间分配耗时任务。 工作队列#xff0c;又称任务队列。主要思…1、work消息模型
工作队列或者竞争消费者模式
在第一篇教程中我们编写了一个程序从一个命名队列中发送并接受消息。在这里我们将创建一个工作队列在多个工作者之间分配耗时任务。 工作队列又称任务队列。主要思想就是避免执行资源密集型任务时必须等待它执行完成。相反我们稍后完成任务我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时任务将在他们之间共享但是一个消息只能被一个消费者获取。 这个概念在Web应用程序中特别有用因为在短的HTTP请求窗口中无法处理复杂的任务。 接下来我们来模拟这个流程
o P生产者任务的发布者 o C1消费者领取任务并且完成任务假设完成速度较快 o C2消费者2领取任务并完成任务假设完成速度慢
面试题避免消息堆积
1采用workqueue多个消费者监听同一队列。 2接收到消息以后而是通过线程池异步消费。
1.1、生产者
生产者与案例1中的几乎一样
public class Send {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i 0; i 50; i) {// 消息内容String message task .. i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );}// 关闭通道和连接channel.close();connection.close();}
}不过这里我们是循环发送50条消息。
1.2、消费者1
// 消费者1
public class Recv {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道final Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);try {// 模拟完成任务的耗时1000msThread.sleep(1000);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}1.3、消费者2
//消费者2
public class Recv2 {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道final Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);try {// 模拟完成任务的耗时200msThread.sleep(200);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}与消费者1基本类似就是没有设置消费耗时时间。 这里是模拟有些消费者快有些比较慢。 接下来两个消费者一同启动然后发送50条消息
可以发现两个消费者各自消费了25条消息而且各不相同这就实现了任务的分发。
1.4、能者多劳
• 刚才的实现有问题吗 o 消费者1比消费者2的效率要低一次任务的耗时较长 o 然而两人最终消费的消息数量是一样的 o 消费者2大量时间处于空闲状态消费者1一直忙碌 • 现在的状态属于是把任务平均分配正确的做法应该是消费越快的人消费的越多。 • 怎么实现呢 o 我们可以使用basicQos方法和prefetchCount 1设置。 o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 o 或者换句话说不要向工作人员发送新消息直到它处理并确认了前一个消息。 o 相反它会将其分派给不是仍然忙碌的下一个工作人员。
再次测试 