引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。 消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。
RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 Rabbitmq试用业务范围:场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息场景3:Publish/Subscribe 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。场景4:Routing (按路线发送接收) 使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。场景5:Topics (按topic发送接收) 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
1:下载所需的tar.gz包 1)ncurses-5.2.tar(它是一个可以使应用程序直接控制终端屏幕显示的库,在安装rabbitmq-server中会报错缺少这个库) 2)simplejson-3.8.0.tar(它是 Python解析JSON的程序库。) 3)otp_src_R15B01.tar(RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境,otp_src_R15B01是提供该环境的包) 4)rabbitmq-server-3.5.4.tar
#########################搭建Erlang环境############################## 2:登录到指定的一台linux服务器 选择要创建一个文件夹,我选择的目录是(目录名称可随便): /data/rabbitMQ 3:进入到该目录,做以下操作 cd /data/rabbitMQ tar xvzf otp_src_R15B01.tar.gz cd otp_src_R15B01 ./configure
最后一步执行完,可能会报一个错: No curses library functions found(这是缺少ncurses-5.2.tar包) 把ncurses-5.2.tar放到指定路径,我这里放到的是/data/ncurses 下,最好和之前放同一个路径 执行以下操作: tar zxvf ncurses-5.2.tar #解压缩并且释放 文件包 cd ncurses-5.2 #进入解压缩的目录(注意版本) ./configure #按照你的系统环境制作安装配置文件 make #编译源代码并且编译NCURSES库 su root #切换到root用户环境 make install #安装编译好的NCURSES库
等操作完以上命令,在重新执行搭建Erlang环境的 ./configure 命令: 编译后的输出如下图 提示没有wxWidgets和fop,但是问题不大。继续: make sudo make install
如果没报错,说明已经安装完Erlang,开始安装RabbitMQ-Server。
#########################搭建simplejson环境############################## 安装RabbitMQ-Server之前,必须先安装simplejson 主要参考官方文档:http://www.rabbitmq.com/build-server.html 需要安装simplejson。从此处下载最新的版本: http://pypi.python.org/pypi/simplejson#downloads 。我下载的版本是 simplejson-3.8.0.tar.gz 安装这个很简单,执行以下3个步骤即可: 进入到该目录: tar xvzf simplejson-3.8.0.tar.gz cd simplejson-3.8.0 sudo python setup.py install
#########################安装RabbitMQ Server环境##############################
最后安装RabbitMQ Server。从此处下载源代码版本的RabbitMQ: http://www.rabbitmq.com/server.html。 我下载的版本是 rabbitmq-server-3.5.4.tar.gz
进入该目录: 所需要的包 yum install xmlto
tar xvzf rabbitmq-server-3.5.4.tar.gz cd rabbitmq-server-3.5.4 make TARGET_DIR=/usr/local SBIN_DIR=/usr/local/sbin MAN_DIR=/usr/local/man make install
安装成功。
运行 找到sbin/目录,默认目录在:/usr/local/sbin/下 运行程序: rabbitmq-server –detached 停止程序: rabbitmqctl stop
安装管理插件 mkdir /etc/rabbitmq cd /usr/local/sbin/ ./rabbitmq-plugins enable rabbitmq_management
./rabbitmq-server -detached
重新启动RabbitMQ,输入http://server-name:15672 就能够进入到监控页面。默认的用户名和密码是: guest 和 guest。 server-name:你指定的linux服务器ip,我的是10.1.100.67
浏览器输入:http://10.1.100.67:15672/ 则说明环境搭建成功 输入默认用户名和密码:guest ,提示登陆失败 翻看官方的release文档后,得知由于账号guest具有所有的操作权限,并且又是默认账号,出于安全因素的考虑,guest用户只能通过localhost登陆使用,并建议修改guest用户的密码以及新建其他账号管理使用rabbitmq(该功能是在3.3.0版本引入的)。
解决方法: 进入/data/rabbitMQ/rabbitmq-server/rabbitmq-server-3.5.4/ebin目录下rabbit.app中找到:loopback_users里的<<”guest”>>删除。 并重启rabbitmq,则可以用guest账号登陆管理控制台。成功界面如下:
Java 代码与spring集成
1:发送者配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xmlns:c="http://www.springframework.org/schema/c" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"> <!-- #################################################################### --> <!-- # RabbitMQ 消息转换器 --> <!-- #################################################################### --> <bean id="rabbitMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper" /> </property> </bean> <!-- #################################################################### --> <!-- # RabbitMQ 异常处理程序 --> <!-- #################################################################### --> <bean id="logAndPropagateErrorHandler" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean"> <property name="staticField"> <value>org.springframework.scheduling.support.TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER</value> </property> </bean> <!-- #################################################################### --> <!-- # rabbit 连接工厂 --> <!-- #################################################################### --> <rabbit:connection-factory id="rabbitConnectionFactory" addresses="10.1.100.67" username="guest" password="guest}" /> <!-- #################################################################### --> <!-- # RabbitMQ 管理员 --> <!-- #################################################################### --> <rabbit:admin connection-factory="rabbitConnectionFactory" /> <!-- #################################################################### --> <!-- # RabbitMQ 队列 --> <!-- #################################################################### --> <!-- 入库自动上架消息消息队列 --> <rabbit:queue id="queues.inbound.putway" name="queues.inbound.putway"> <rabbit:queue-arguments> <!-- 开启集群环境镜像队列 --> <entry key="x-ha-policy" value="all" /> </rabbit:queue-arguments> </rabbit:queue> <!-- #################################################################### --> <!-- # RabbitMQ Exchange --> <!-- # PS:RabbitMQ 中所有生产者提交的消息都由 Exchange 来接受,再由 Exchange 按照特定的策略转发到 Queue 进行存储/处理 --> <!-- # 策略规则:pattern 表达式中 # 表示0个或若干个关键字, * 表示一个关键字 --> <!-- # 示例A:queues.# 能匹配 queues.sample 也能匹配 queues.sample.one --> <!-- # 示例B:queues.* 能匹配 queues.sample 不能匹配 queues.sample.one --> <!-- #################################################################### --> <rabbit:topic-exchange name="amq.topic"> <rabbit:bindings> <rabbit:binding queue="queues.inbound.putway" pattern="queues.inbound.#"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- #################################################################### --> <!-- # RabbitMQ 模板(生产者) --> <!-- #################################################################### --> <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" channel-transacted="true" message-converter="rabbitMessageConverter" /> </beans>
首先导入jar:spring-rabbit-1.2.2.RELEASE.jar 引入jar:
1 2 3 4 5 6 7 8 @Autowired private RabbitTemplate rabbitTemplate; 发送者java代码: rabbitTemplate.convertAndSend("queues.inbound.putway",productPutaway); Queues.inbound.putway :xml 里面定义的id productPutaway:要发送的消息,可以为字符串对象,集合,map等形式
2:消费者配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xmlns:c="http://www.springframework.org/schema/c" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"> <!-- #################################################################### --> <!-- # RabbitMQ 消息转换器 --> <!-- #################################################################### --> <bean id="rabbitMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"> <property name="classMapper"> <bean class="org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper" /> </property> </bean> <!-- #################################################################### --> <!-- # RabbitMQ 异常处理程序 --> <!-- #################################################################### --> <bean id="logAndPropagateErrorHandler" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean"> <property name="staticField"> <value>org.springframework.scheduling.support.TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER</value> </property> </bean> <!-- #################################################################### --> <!-- # rabbit 连接工厂 --> <!-- #################################################################### --> <rabbit:connection-factory id="rabbitConnectionFactory" addresses="10.1.100.67" username="guest" password="guest}" /> <!-- #################################################################### --> <!-- # RabbitMQ 管理员 --> <!-- #################################################################### --> <rabbit:admin connection-factory="rabbitConnectionFactory" /> <!-- #################################################################### --> <!-- # RabbitMQ 队列 --> <!-- #################################################################### --> <!-- #################################################################### --> <!-- # RabbitMQ 模板(生产者) --> <!-- #################################################################### --> <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" channel-transacted="true" message-converter="rabbitMessageConverter" /> <!-- #################################################################### --> <!-- # RabbitMQ 监听容器(消费者) --> <!-- #################################################################### --> <rabbit:listener-container connection-factory="rabbitConnectionFactory" message-converter="rabbitMessageConverter" channel-transacted="true" error-handler="logAndPropagateErrorHandler"> <!-- 货品上架单消费者 --> <rabbit:listener queues="queues.inbound.putway" ref="productPutAwayMessageConsumer" method="saveProductPutaway" /> </rabbit:listener-container> </beans>
接收者Java代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 /** * 收货入库单插入上架单消息对象介绍处理类 * @author Min.Wang * */ @Component public class ProductPutAwayMessageConsumer { @Autowired private ProductPutawayService productPutawayService; /** * 保存上架单/给收货完成是调用 * @author Min.Wang */ public void saveProductPutaway(ProductPutaway productPutaway){ //根据收到的消息数据,处理对应的业务逻辑 } } 更多扩展请了解rabbitmq官网:http://www.rabbitmq.com/