做个网站需要多久网站设计费用多少,google谷歌搜索引擎入口,wordpress导入网站模板,萧山品牌网站建设【README】
本文介绍 通配符模式#xff0c;及代码示例
【1】intro to rabbitmq通配符模式
0#xff09;通配符模式-交换机类型为 Topic#xff1b; 1#xff09;与路由模式相比#xff0c;相同点是 两者都可以通过 routingkey 把消息转发到不同的队列#xff1b; 不同…【README】
本文介绍 通配符模式及代码示例
【1】intro to rabbitmq通配符模式
0通配符模式-交换机类型为 Topic 1与路由模式相比相同点是 两者都可以通过 routingkey 把消息转发到不同的队列 不同点是通配符模式-topic类型的exchange可以让队列在绑定routing key的时候使用通配符 2通配符模式的routingkey 通常使用多个单词并用点号连接如 item.insert 3通配符规则 # 匹配一个或多个词 * 匹配不多不少一个词 荔枝 item.# 能够匹配 item.insert.abc 或 item.insert ; 可以多层 item.* 能够匹配 item.insert ; 只能一层 refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
4新建队列 5把队列绑定到交换机 6生产者发送消息到队列路由key 分别是 item.insert , item.update, item.delete 如下 【2】代码
生产者
/*** 通配符模式-交换机类型为TOPIC*/
public class WildProducer {/* 交换机名称 */static final String TOPIC_EXCHANGE topic_exchange; /*队列名称1*/ static final String TOPIC_QUEUE_1 topic_queue_1;/*队列名称2*/static final String TOPIC_QUEUE_2 topic_queue_2;public static void main(String[] args) throws Exception {/*获取连接*/Connection conn RBConnectionUtil.getConn();// 创建频道 Channel channel conn.createChannel();/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型fanout, topic, direct, headers*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); /*** routingkey-路由键 */String itemInsertRoutingKey item.insert; String itemUpdateRoutingKey item.update;String itemDeleteRoutingKey item.delete;/* 发送消息-insert */ /*** 参数1 交换机名称 如果没有指定则使用默认 default exchange * 参数2 routingkey-路由key 简单模式可以传递队列名称 * 参数3 消息其他属性* 参数4 消息内容 */String insertMsg 我是消息通配符模式routingkey itemInsertRoutingKey MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());System.out.println(已发送消息 insertMsg); String updMsg 我是消息通配符模式routingkey itemUpdateRoutingKey MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());System.out.println(已发送消息 updMsg);String deleteMsg 我是消息通配符模式routingkey itemDeleteRoutingKey MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());System.out.println(已发送消息 deleteMsg);/* 关闭连接和信道 */ channel.close();conn.close(); }
}
消费者1 topic_queue_1
/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild1 {/* 交换机名称 */static final String TOPIC_EXCHANGE topic_exchange; /*队列名称1*/ static final String TOPIC_QUEUE_1 topic_queue_1;public static void main(String[] args) throws Exception {/*创建连接 */Connection conn RBConnectionUtil.getConn();/*创建队列*/Channel channel conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey item.insert; String itemUpdateRoutingKey item.update;String itemDeleteRoutingKey item.delete;/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
// channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称* 参数2 交换机* 参数3 routingkey-路由键 */
// channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, item.#); // ui界面可以把队列绑定到交换机 /* 创建消费者设置消息处理逻辑 */Consumer consumer new DefaultConsumer(channel) {/*** param consumerTag 消费者标签在 channel.basicConsume 可以指定 * param envelope 消息包内容包括消息id消息routingkey交换机消息和重转标记收到消息失败后是否需要重新发送 * param properties 基本属性* param body 消息字节数组 */Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println( 消费者1 start );System.out.println(路由key envelope.getRoutingKey());System.out.println(交换机 envelope.getExchange());System.out.println(消息id envelope.getDeliveryTag()); String message new String(body, UTF-8);System.out.println(String.format(消费者收到的消息【%s】, message)); System.out.println( 消费者1 end \n); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认 设置为true表示消息接收到自动向 mq回复ackmq收到ack后会删除消息 设置为false则需要手动发送ack * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_1, true, consumer); }
}消费者2 topic_queue_2
/*** 通配符模式消费者-routingkey */
public class RouteConsumerWild2 {/* 交换机名称 */static final String TOPIC_EXCHANGE topic_exchange; /*队列名称1*/ static final String TOPIC_QUEUE_2 topic_queue_2;public static void main(String[] args) throws Exception {/*创建连接 */Connection conn RBConnectionUtil.getConn();/*创建队列*/Channel channel conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由键 */String itemInsertRoutingKey item.insert; String itemUpdateRoutingKey item.update;String itemDeleteRoutingKey item.delete;/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */
// channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null); // ui界面可以创建队列 /*** 队列绑定交换机* 参数1 队列名称 * 参数2 交换机* 参数3 routingkey-路由键 */
// channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, *.delete); // ui界面可以把队列绑定到交换机 /* 创建消费者设置消息处理逻辑 */Consumer consumer new DefaultConsumer(channel) {/** * param consumerTag 消费者标签在 channel.basicConsume 可以指定 * param envelope 消息包内容包括消息id消息routingkey交换机消息和重转标记收到消息失败后是否需要重新发送 * param properties 基本属性* param body 消息字节数组 */Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println( 消费者1 start );System.out.println(路由key envelope.getRoutingKey());System.out.println(交换机 envelope.getExchange());System.out.println(消息id envelope.getDeliveryTag()); String message new String(body, UTF-8);System.out.println(String.format(消费者收到的消息【%s】, message)); System.out.println( 消费者1 end \n); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认 设置为true表示消息接收到自动向 mq回复ackmq收到ack后会删除消息 设置为false则需要手动发送ack * 参数3 消息接收后的回调 */channel.basicConsume(TOPIC_QUEUE_2, true, consumer); }
}
【3】 rabbitmq 模式总结
8.1模式1 简单模式 helloworld
一个生产者一个消费者不需要设置交换机使用默认交换机
8.2模式2 工作队列模式 work queue
一个生产者多个消费者竞争关系不需要设置交换机使用默认交换机
8.3发布订阅模式 publish/subscribe
需要设置类型为 fanout-广播的交换机并且交换机和队列进行绑定当发送消息到交换机后交换机会将消息发送到绑定的队列
8.4路由模式 routing
需要设置类型为 direct的交换机 交换机和队列进行绑定并且指定routing key当发送消息到交换机后交换机会根据routing key 将消息发送到对应队列
8.5通配符模式 topic
需要设置类型为 topic的交换机 交换机和队列进行绑定 并且指定通配符方式的routing key 当发送消息到交换机后交换机会根据 routing key将消息发送到对应的队列