wordpress 全站密码,手机交互设计网站,WordPress无法写博客头像,网站开发脚本解析器amqp rabbitmqSpring Integration消息通道默认情况下将消息存储在内存中。 这是因为内存速度快#xff0c;易于实现#xff0c;并且不会增加网络成本。 但是#xff0c;在某些情况下#xff0c;这可能会引起问题#xff0c;因为如果应用程序崩溃或服务器意外关闭#xff… amqp rabbitmq Spring Integration消息通道默认情况下将消息存储在内存中。 这是因为内存速度快易于实现并且不会增加网络成本。 但是在某些情况下这可能会引起问题因为如果应用程序崩溃或服务器意外关闭所有消息都将丢失。 对于这种情况Spring Integration引入了JMSAMQP支持的消息通道因此消息存储在JMSAMQP代理中而不是存储在内存中。 高级消息队列协议AMQP是消息协议的开放标准。 它允许应用程序异步可靠和安全地进行通信。 RabbitMQ是支持AMQP标准的开源消息代理。 RabbitMQ最重要的功能之一就是高可用性队列。 在本文中通过创建两个消息传递节点和一个覆盖两个RabbitMQ服务器的RabbitMQ集群来说明Spring Integration的AMQP支持的点对点消息通道方法。 两个消息传递节点开始使用RabbitMQ集群处理Order消息。 如果意外关闭了第一消息节点和第一RabbitMQ服务器第二消息节点和第二RabbitMQ服务器将继续处理Order消息因此可以通过使用高可用性AMQP支持的通道来防止潜在的消息丢失和服务中断问题。 还建议使用Spring Integration进行消息处理文章介绍一下Spring Integration的主要组件。 订单消息系统的Spring集成流程如下 订单列表通过Order Gateway发送到Order Splitter的输入通道。 订单拆分器将订单列表拆分为订单消息并将其发送到“订单流程服务激活器”。 processChannel是点对点AMQP支持的消息通道。 它创建了RabbitMQ集群管理的ha.rabbit.channel队列并将订单消息发送到ha.rabbit.channel Rabbit队列以实现高可用性。 让我们看看示例订单消息传递实现。 二手技术 JDK 1.8.0_25 Spring4.1.4 Spring Integration 4.1.2 RabbitMQ服务器3.4.2 Maven 3.2.2 Ubuntu 14.04 项目层次结构如下 步骤1依存关系 Spring和Spring Integration Framework的依赖关系如下 propertiesspring.version4.1.4.RELEASE/spring.versionspring.integration.version4.1.2.RELEASE/spring.integration.version/propertiesdependencies!-- Spring 4 dependencies --dependencygroupIdorg.springframework/groupIdartifactIdspring-context/artifactIdversion${spring.version}/version/dependency!-- Spring Integration dependencies --dependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-core/artifactIdversion${spring.integration.version}/version/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-amqp/artifactIdversion${spring.integration.version}/versionscopecompile/scope/dependencydependencies第2步rabbitmq.config 第一RabbitMQ Server的配置文件rabbitmq.config如下。 它应该放在../rabbitmq_server-version/etc/rabbitmq/下 [{rabbit, [ {tcp_listeners, [5672]},{collect_statistics_interval, 10000},{heartbeat,30},{cluster_partition_handling, pause_minority},{cluster_nodes, {[ rabbitmaster,rabbit2master],disc}} ] },{rabbitmq_management, [ {http_log_dir,/tmp/rabbit-mgmt},{listener, [{port, 15672}]} ] },{rabbitmq_management_agent, [ {force_fine_statistics, true} ] }
]. 第二个RabbitMQ服务器的rabbitmq.config文件 [{rabbit, [ {tcp_listeners, [5673]},{collect_statistics_interval, 10000},{heartbeat,30},{cluster_partition_handling, pause_minority},{cluster_nodes, {[ rabbitmaster,rabbit2master],disc}} ] },{rabbitmq_management, [ {http_log_dir,/tmp/rabbit-mgmt},{listener, [{port, 15673}]} ] },{rabbitmq_management_agent, [ {force_fine_statistics, true} ] }
].步骤3集成上下文 Spring Integration Context的创建如下。 订单列表通过Order Gateway发送到Order Splitter的输入通道。 订单拆分器将订单列表拆分为订单消息并将其发送到“订单流程服务激活器”。 processChannel是点对点AMQP支持的消息通道。 它创建了RabbitMQ集群管理的ha.rabbit.channel队列并将订单消息发送到ha.rabbit.channel RabbitMQ队列以实现高可用性。 ?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:inthttp://www.springframework.org/schema/integrationxmlns:int-amqphttp://www.springframework.org/schema/integration/amqpxmlns:rabbithttp://www.springframework.org/schema/rabbitxmlns:contexthttp://www.springframework.org/schema/contextxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd!-- Configuration for Component Scan --context:component-scan base-packagecom.onlinetechvision /context:property-placeholder locationclasspath*:rabbitmq.properties/int:channel idinputChannel/int:gateway idorderGateway service-interfacecom.onlinetechvision.integration.OrderGateway default-request-channelinputChannel /int-amqp:channel idprocessChannelconnection-factoryconnectionFactory message-driventruequeue-nameha.rabbit.channel /!-- RabbitMQ Connection Factory --rabbit:connection-factory idconnectionFactoryaddresses${rabbitmq.addresses} username${rabbitmq.username}password${rabbitmq.password} /int:splitter idorderSplitter input-channelinputChannel output-channelprocessChannel /int:service-activator input-channelprocessChannel reforderProcessService methodprocess //beans第4步rabbitmq.properties rabbitmq.properties的创建如下。 如果第一个RabbitMQ服务器意外关闭第二个RabbitMQ将继续侦听Order消息。 rabbitmq.addresseslocalhost:5672,localhost:5673
rabbitmq.usernameguest
rabbitmq.passwordguest步骤5订单模型 订购Bean模型订购消息。 import java.io.Serializable;public class Order implements Serializable {private static final long serialVersionUID -2138235868650860555L;private int id;private String name;public Order(int id, String name) {this.id id;this.name name;}//Getter and Setter Methods...Overridepublic String toString() {return Order [id id , name name ];}}步骤6OrderGateway OrderGateway接口提供应用程序对Order消息系统的访问。 它的默认请求通道是inputChannel。 import java.util.List;import org.springframework.messaging.Message;import com.onlinetechvision.model.Order;public interface OrderGateway {/*** Processes Order Request** param message SI Message covering Order payload.*/void processOrderRequest(MessageListOrder message);
}步骤7OrderSplitter OrderSplitter监听inputChannel并将传入的Order List分解为Order消息。 订单消息将发送到AMQP支持的processChannel。 import java.util.List;import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import com.onlinetechvision.model.Order;Component(orderSplitter)
public class OrderSplitter {/*** Splits Order List to Order message(s)** param message SI Message covering Order List payload.* return order list*/public ListOrder splitOrderList(MessageListOrder message) {return message.getPayload();}
}步骤8ProcessService 通用流程服务接口向消息系统公开流程服务功能。 import org.springframework.messaging.Message;public interface ProcessServiceT {/*** Processes incoming message(s)** param message SI Message.*/void process(MessageT message);}步骤9OrderProcessService 订单流程服务激活器侦听AMQP支持的processChannel并记录传入的订单消息。 添加睡眠以填充ha.rabbit.channel RabbitMQ队列。 import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import com.onlinetechvision.model.Order;Component(orderProcessService)
public class OrderProcessService implements ProcessServiceOrder {private final Logger logger LoggerFactory.getLogger(OrderProcessService.class);private final static long SLEEP_DURATION 1_000;Overridepublic void process(MessageOrder message) {try {Thread.sleep(SLEEP_DURATION);} catch (InterruptedException e) {Thread.currentThread().interrupt();}logger.debug(Node 1 - Received Message : message.getPayload());}}步骤10申请 应用程序类通过初始化应用程序上下文来运行应用程序并将订单消息发送到消息传递系统。 仅第一个消息传递节点创建Order消息而两个消息传递节点处理它们。 请找到第一和第二个消息传递节点的应用程序类如下所示 第一消息节点的应用程序类 import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import com.onlinetechvision.integration.OrderGateway;
import com.onlinetechvision.model.Order;public class Application {private final static int MESSAGE_LIMIT 1_000;private final static int ORDER_LIST_SIZE 10;private final static long SLEEP_DURATION 50;private static OrderGateway orderGateway;/*** Starts the application** param String[] args**/public static void main(String[] args) {ApplicationContext context new ClassPathXmlApplicationContext(applicationContext.xml);orderGateway context.getBean(OrderGateway.class);Executors.newSingleThreadExecutor().execute(new Runnable() {Overridepublic void run() {try {int firstIndex 0, lastIndex ORDER_LIST_SIZE;while(lastIndex MESSAGE_LIMIT) {MessageListOrder message MessageBuilder.withPayload(getOrderList(firstIndex, lastIndex)).build();orderGateway.processOrderRequest(message);firstIndex ORDER_LIST_SIZE;lastIndex ORDER_LIST_SIZE;Thread.sleep(SLEEP_DURATION);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}});}/*** Creates a sample order list and returns.** return order list*/private static ListOrder getOrderList(final int firstIndex, final int lastIndex) {ListOrder orderList new ArrayList(lastIndex);for(int i firstIndex; i lastIndex; i) {orderList.add(new Order(i, Sample_Order_ i));}return orderList;}} 第二个消息节点的应用程序类 import org.springframework.context.support.ClassPathXmlApplicationContext;public class Application {/*** Starts the application** param String[] args**/public static void main(String[] args) {new ClassPathXmlApplicationContext(applicationContext.xml);}}步骤11RabbitMQ群集Bash脚本 First RabbitMQ Server的示例bash脚本如下。 另外请查看RabbitMQ Cluster文档以了解其他配置步骤。 #!/bin/bash
echo *** First RabbitMQ Server is setting up ***export RABBITMQ_HOSTNAMErabbitmaster
export RABBITMQ_NODE_PORT5672
export RABBITMQ_NODENAMErabbitmaster
export RABBITMQ_SERVER_START_ARGS-rabbitmq_management listener [{port,15672}]/DEV_TOOLS/rabbitmq_server-3.4.2/sbin/rabbitmq-server echo *** Second RabbitMQ Server is set up succesfully. ***sleep 5echo *** First RabbitMQ Server s status : ***/DEV_TOOLS/rabbitmq_server-3.4.2/sbin/rabbitmqctl status 第二个RabbitMQ Server的示例bash脚本如下 #!/bin/bash
echo *** Second RabbitMQ Server is setting up ***export RABBITMQ_HOSTNAMErabbit2master
export RABBITMQ_NODE_PORT5673
export RABBITMQ_NODENAMErabbit2master
export RABBITMQ_SERVER_START_ARGS-rabbitmq_management listener [{port,15673}]/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmq-server echo *** Second RabbitMQ Server is set up succesfully. ***sleep 5echo *** Second RabbitMQ Server s status : ***/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmqctl statussleep 5echo *** Second RabbitMQ Server is being added to cluster... ***/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmqctl -n rabbit2master stop_app
/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmqctl -n rabbit2master join_cluster rabbitmaster
/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmqctl -n rabbit2master start_app
/DEV_TOOLS/rabbitmq_server-3.4.2/sbin/rabbitmqctl -n rabbitmaster set_policy ha-all ^ha\. {ha-mode:all}echo *** Second RabbitMQ Server is added to cluster successfully... ***sleep 5echo *** Second RabbitMQ Server s cluster status : ***/DEV_TOOLS/rabbitmq_server-3.4.2_2/sbin/rabbitmqctl cluster_status步骤12建立并执行专案 订单消息的操作结果如下 第一个RabbitMQ服务器启动。 第二台RabbitMQ服务器已启动并添加到集群中。 RabbitMQ群集概述如下 设置第一台RabbitMQ Server的高可用性HA策略。 第一个消息传递节点已启动。 它创建订单消息和流程。 启动第一个消息传递节点后Spring Integration上下文会自动创建一个ha.rabbit.channel RabbitMQ队列如下所示 第二个消息传递节点已启动。 它不会创建Order消息而只是处理。 订单列表开始处理。 在第一个和第二个消息节点连接到RabbitMQ集群之后ha.rabbit.channel队列详细信息如下 第一台RabbitMQ服务器上的ha.rabbit.channel队列 第二台RabbitMQ服务器上的ha.rabbit.channel队列 第一个消息节点关闭。 首先RabbitMQ Server关闭并离开集群。 第二消息节点和第二RabbitMQ服务器处理传入的Order消息以实现高可用性因此不会中断服务。 第二个RabbitMQ节点的屏幕快照如下 也将看到以下控制台输出日志 第一个消息节点控制台 ...22:32:51.838 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 1 - Received Message : Order [id260, nameSample_Order_260]
22:32:52.842 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 1 - Received Message : Order [id261, nameSample_Order_261]
22:32:53.847 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 1 - Received Message : Order [id263, nameSample_Order_263]
22:32:54.852 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 1 - Received Message : Order [id264, nameSample_Order_264] 在消息ID264被传递到第一个消息节点之后它和第一个RabbitMQ节点被关闭第二个消息节点和第二个RabbitMQ节点按以下方式处理剩余的订单消息 第二个消息节点控制台 ...22:32:54.211 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id262, nameSample_Order_262]
22:32:56.214 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id265, nameSample_Order_265]
22:32:58.219 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id266, nameSample_Order_266]
22:33:00.223 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id267, nameSample_Order_267]
22:33:02.229 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id268, nameSample_Order_268]
22:33:04.234 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id269, nameSample_Order_269]
22:33:06.239 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id270, nameSample_Order_270]
22:33:08.241 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id271, nameSample_Order_271]
22:33:10.247 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id272, nameSample_Order_272]
22:33:12.252 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id273, nameSample_Order_273]
22:33:14.255 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id274, nameSample_Order_274]
22:33:16.258 [SimpleAsyncTaskExecutor-1] DEBUG c.o.p.s.OrderProcessService - Node 2 - Received Message : Order [id275, nameSample_Order_275]... 源代码 源代码在Github上可用 参考资料 企业整合模式 Spring集成参考手册 Spring Integration 4.1.2.RELEASE API Pro Spring整合 RabbitMQ服务器文档 翻译自: https://www.javacodegeeks.com/2015/01/high-available-amqp-backed-message-channels-via-spring-integration-and-rabbitmq.htmlamqp rabbitmq