
RabbitMQ 学习初入门

RabbitMQ是一个消息队列,我们可以使用RabbitMQ 做消息队列,消息通知的业务功能,而且根据网上的不可靠消息得出,RabbitMQ 的性能水平甚至比 activeMQ 还要好,所以也是我选择认真去学习RabbitMQ的原因,当然我也有做个关于 activeMQ 的一些简单的 Demo 有机会的话可以分享出来~

Rabbit 使用 Erlang 来编写的AMQP 服务器。 什么是 AMQP 本人已经百度给大家了:

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

哎哟,先从按照说起吧~ 我使用macbook 来进行学习和开发的 顺便提一下 本人是位果粉,别喷~ 使用 MAC OS 安非常简单,还我是建议使用 MAC OS 的同学使用 brew 进行开发,没有别的 就是因为方便。

使用这个命令之前 如果没有安装过 Homebrew 就安装一下吧 有机会我也分享一下安装方式 顺便介绍一下这个让你懒癌晚期的Homebrew。


 brew install rabbitmq

然后会告诉你 无法获得 /usr/local/sbin 的写入权限。 因为是英文的错误提示,本人会看不会写 所以直接写中文给各位看了~ 还是那句 别喷~ 谢谢

 mkdir /usr/local/sbin       创建目录
 chmod 777 /usr/local/sbin   开权限,全开别烦
 brew link rabbitmq          link 一下


启动 rabbitMQ

MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server 

              RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
              Starting broker... completed with 10 plugins.

目前为止已经安装完 rabbitMQ 并且启动起来 了 简单到飞

开始废话 说说概念,别方 我也是抄书的~

消费者和生产者,由于书本的篇幅太长,就抄了!简单明了告诉你们,消息的产生者就是生产者,获得消息的称为消息消费者。我QQ M了你一下 我就是消息的生产者了,你在QQ看到我的消息了,你就是消息的消费者了,这才是正常写书方式嘛··· 举个开发的业务背景的例子,我支付了一张订单,这个业务产生了几个次要的业务,例如积分业务,我购买了订单,订单核心功能处理订单的业务,然后生产了一个消息并通过rabbitMQ 发送了一个消息,内容为产生了一个订单,然后订阅了消息的消息消费者就会消费消息,然后根据消息处理自己的业务,该干嘛干嘛去~ 积分业务就去加你的积分等等。当然这个只是举例,别吐槽 可以在台服务器或者一个业务方法上实现。 上面的举例对于一些分布式的微服务上是非常常见的方式了~


信道:在连接到rabbitMQ 之后,你将可以创建并获得一个AMQP信道,信道是创建在“真实的”TCP连接内的虚拟链接。所有的AMQP命令都是通过 AMQP 信道发出去的,每条信道会被指派一个唯一的ID。所有发布消息还是订阅都是通过AMQP 信道去处理的。这个概念非常像端口号【但是不是端口啊 ,别说我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的开发者已经比较了解~ 其实使用AMQP信道的原因非常简单,建立TCP连接的性能代价是非常高的,当你在不同的线程当中去创建TCP连接的方式去和RabbitMQ进行通信的代价是十分高的,但是在多个不同的线程当中通过channel 我们就可以通过同一个TCP连接进行通信了。好像写得好乱不知道他们理不理解我写什么,唉 算了 本人的水平有限,况且大多数情况下都是我自己看而已,放弃~ 抽象图如下 丑,不过先将就~


交换器 与 路由键: 这个好难解释啊~ 直接说工作流程吧~ 哎哟其实应该把 队列写到这里来,算了吧 不改了。 先记住,交换器当中里面有N个队列,而路由键 是让交换器知道消息应该放到哪个队列当中去。别废话举例: 产生消息 要告诉 RabbitMQ 这个消息是放到那个交换器上,【注意:交换器是由开发者去创建的你可以搞N个交换器出来,没有管你】
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交换器的名字, hola 是路由键 这个让hello-exchange 去判断应该分发到那个队列当中,SerializationUtils.serialize(msg) 将一个消息序列化。 channel 就不用说啦上面说了,是一个AMQP信道。


1、消费者订阅消息,先创建一个交换器,如果交换器已经存在即返回一个交换器。当然你可以passive 为 true 如果 passive 为 true , 交换器已经存在返回一个已经存在的交换器,否则报错。我暂时不知道用在哪里,所以 先记住一般情况下 创建一个交换器,如果存在就返回一个已经存在的交换器,如果不存在则创建并返回。

JAVA 代码如下【直接给源码的方法定义,是不是很贴心】:


     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange  【交换器名称】
     * @param type the exchange type 【交换器类型 一会说 别着急】
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交换器,这个后面说,还是简单说一下,纠结帝~ 就是交换器中的消息会持久化,就是会存在硬盘当中,宕机或重启服务时会自动恢复当时会牺牲性能,除非你的SSD硬盘好快,当我没有说过~】
     * @param autoDelete true if the server should delete the exchange when it is no longer in use  【当这个交换器 已经没有人使用的时候 会自动删除,longer 长时期 当时我不知道长时期是什么时候,所以我会记住是 没人用就自己删掉 自动消失】
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client. 【不知道什么鬼意思,如果是内部 不会直接发送到客户端,我一般写false ,真心不知道他什么意思,后面学下去应该明白】
     * @param arguments other properties (construction arguments) for the exchange 【其他参数 暂时没有用到】
     * @return a declaration-confirm method to indicate the exchange was successfully declared  
     * @throws java.io.IOException if an error is encountered 【会抛出IO异常】
Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;


JAVA 代码如下【直接给源码的方法定义】:

     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue  【队列名称】 
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection  就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】 
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~
     * @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】
     * @return a declaration-confirm method to indicate the queue was successfully declared  
     * @throws java.io.IOException if an error is encountered
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
绑定队列和交换器 并指定路由键
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue  【队列名称】
     * @param exchange the name of the exchange 【交换器名称】
     * @param routingKey the routine key to use for the binding 【路由键】
     * @param arguments other properties (binding parameters) 【其他参数 还是那句没有玩过】
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
    Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


JAVA 代码:

     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue 【所订阅消费的队列】
     * @param autoAck true if the server should consider messages 【是否为自动确定消息,好像TCP的ack syn 啊,可怕的7层模型!!!一般写true就可以】
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object  【回调callback 这个马上上代码看看】
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;


 * Interface for application callback objects to receive notifications and messages from
 * a queue by subscription.
 * Most implementations will subclass {@link DefaultConsumer}.
 * <p/>
 * The methods of this interface are invoked in a dispatch
 * thread which is separate from the {@link Connection}'s thread. This
 * allows {@link Consumer}s to call {@link Channel} or {@link
 * Connection} methods without causing a deadlock.
 * <p/>
 * The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
 * dispatch threads. {@link Consumer}s should avoid executing long-running code
 * because this will delay dispatch of messages to other {@link Consumer}s on the same
 * {@link Channel}.
 * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
 * @see Channel#basicCancel
public interface Consumer {
     * Called when the consumer is registered by a call to any of the
     * {@link Channel#basicConsume} methods.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
    void handleConsumeOk(String consumerTag);

     * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
    void handleCancelOk(String consumerTag);

     * Called when the consumer is cancelled for reasons <i>other than</i> by a call to
     * {@link Channel#basicCancel}. For example, the queue has been deleted.
     * See {@link #handleCancelOk} for notification of consumer
     * cancellation due to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @throws IOException
    void handleCancel(String consumerTag) throws IOException;

     * Called when either the channel or the underlying connection has been shut down.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

     * Called when a <code><b>basic.recover-ok</b></code> is received
     * in reply to a <code><b>basic.recover</b></code>. All messages
     * received before this is invoked that haven't been <i>ack</i>'ed will be
     * re-delivered. All messages received afterwards won't be.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
    void handleRecoverOk(String consumerTag);

     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope  宝宝累了 看重点,这个方法就是消息到达的时候回调的方法其他你们喜欢研究 可以认真看看英文备注。
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;


4、上面所说的都是消息消费者的工作,这个step开始说消息生产者要做的。开启一个交换器,当然这个交换器应该是存在不用创建的因为 上面所说消费者已经创建过了,但是其实谁去先创建都是可以的,消费者也可以创建,生产者也可以,这个倒是没有什么关系,主要是看你的业务需求【这句话甩锅用】。由于已经贴出过创建交换器的代码所以就不贴了,看上面。




     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to  【发送到那个交换器上】
     * @param routingKey the routing key  【路由键 决定消息去哪个队列里面混】
     * @param props other properties for the message - routing headers etc  【其他消息参数, 哎~ 这个我玩过,一会DEMO时间,表演一下】
     * @param body the message body 【消息体,一般把对象序列化一下发过去】
     * @throws java.io.IOException if an error is encountered
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

然后就没有然后了 上面 第三步的 订阅方法中的回调 就会有这个消息了。 好又到说概念的时候了。 困的同学先休息。


【1】 消息MESSAGE_A 到达 queue_A 队列
【2】 RabbitMQ 把 MESSAGE_A 发送到 APPLICATION A(消费者)
【3】 APPLICATION A 确定收到了消息 发个ack 高兴一下
【4】 RabbitMQ 把 消息从 queue_A 队列删除。

上面其实也有说过,autoack参数 有印象吧~ 所以autoack 设置成true 就没错了。这里就可以开展其他话题了,如果多个订阅者怎么办呢,是不是多个订阅者都会收到消息呢,答案是否定的。RabbitMQ的队列会对订阅者做一个循环,例如目前有两个订阅者订阅了同一个队列,serverA 和 serverB 他会循环去发送消息,队列有 ID 1-10 的消息,ID 1消息会由 serverA 处理 ID 2 消息会由 serverB 处理 ID 3 消息会由 serverA 去处理 如此类推。
如果serverA 在发送ACK应答给 RabbitMQ之前 断开连接【就是服务挂了,宕机了】RabbitMQ 会认为这个消息没有处理,即没有消费到这个消息,会把这个消息发送到 serverB 去处理。


如果 一个channel 订阅了一个队列就不能订阅别的队列了,也就是说一个channel只能订阅一个队列。所以你需要取消原来的订阅,并将信道设置为“传输”模式。后面有时间写后面的文章可能会说到。


direct :这个简单,只要路由键一模一样就投递到相应的队列当中。

fanout :这个也简单,消息会投递到所有绑定到这个交换器的队列当中。

topic :这个就复杂一点了,我很客观的简单就简单 复杂就复杂 从不忽悠。但是这个也好常用不能不学,好累。在路由键当中可以使用通配符。怎么说呢,好纠结啊 不知道怎么解释啊 怎么办 在线等。


//绑定队列 路由键是 tony.teamA 有路由键为tony.teamA的消息就投递到这里

//绑定队列 路由键是 yan.teamA 有路由键为tony.teamB的消息就投递到这里

//绑定队列 路由键是 *.teamA  有路由键为.teamA结尾的消息就投递到这里

//绑定队列 路由键是 chao.teamB 有路由键为chao.teamB的消息就投递到这里

//绑定队列 路由键是 *.teamB 有路由键为.teamB结尾的消息就投递到这里

//绑定队列 路由键是 # 所有在这个交换器的消息都投递到这里

// queueA[路由键:tony.teamA]、queueC[路由键:*.teamA]、queueF[路由键:#]会收到消息

// queueD[路由键:chao.teamB]、queueE[路由键:*.teamB]、queueF[路由键:#]会收到消息

然后基本的rabbitMQ 的东西就搞好了。不过这只是开始~

简单说一下虚拟主机和隔离吧,简单来说想MySQL 你可以创建很多个库,里面有很多个表。然后呢 rabbitMQ可以创建很多个虚拟主机,虚拟主机里面有很多个交换器,交换器里面有很多个队列,解释得完美。默认会提供一个 默认虚拟主机 vhost : “/”。后面找时间再说这个 vhost 吧。



package com.maxfunner;

import com.maxfunner.mq.EndPoint;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 * Producer
public class Producer {

    private Connection connection;
    private Channel channel;
    private Map<Long, String> messageMap = new HashMap<Long, String>();
    private int maxID = 0;

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";

    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");  //服务器地址
        factory.setUsername("guest");  //默认用户名
        factory.setPassword("guest"); //默认密码
        factory.setPort(5672); //默认端口,对就是这么屌全部默认的

        this.connection = factory.newConnection();  //创建链接

        this.channel = this.connection.createChannel();


    public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //启用消息确认已经投递成功的回调

         * 创建了一个交换器,类型为 direct 非持久化 自动删除  没有额外参数
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {

             * 成功的时候回调【这个是当消息到达交换器的时候回调】
             * @param deliveryTag   每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
             * @param multiple
             * @throws IOException
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 发送成功");

                //最后一个消息都搞掂之后 关闭所有东西
                if (deliveryTag >= maxID) {


             * 失败的时候回调
             * @param deliveryTag   每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
             * @param multiple
             * @throws IOException
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 发送失败");
                messageMap.remove(message); //发送失败就不重发了,发脾气

                //最后一个消息都搞掂之后 关闭所有东西
                if (deliveryTag >= maxID) {



    public void sendMessage(String message) throws IOException {

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentType("text/plain")  //指定是一个文本

        // 发送一个消息 到 EXCHANGE_NAME 的交换器中  路由键为 KEY_A  发送 message 之前序列化一下 具体用什么包上面import自己看
        this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));


    public void closeAnything() throws IOException {
        this.channel.close();   //跪安吧 小channel
        this.connection.close(); //你也滚吧 connection

    public static void main(String[] args) throws IOException {

        Producer producer = new Producer();

        List<String> messageList = new ArrayList<String>();

        producer.maxID = messageList.size();    //记录最后一个ID 当最后一个消息发送成功后关闭连接

        //注意:因为channel产生的ID 是从1开始的
        for (int i = 1; i <= messageList.size(); i++) {

            producer.messageMap.put(new Long(i), messageList.get(i - 1));    //这里看懂了吗?没看懂也没有办法了,这里我真不知道怎么解释
            producer.sendMessage(messageList.get(i - 1));




package com.maxfunner;

import com.maxfunner.mq.QueueConsumer;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

 * Consumer
public class Consumer {

    private Connection connection;
    private Channel channel;
    private Map<Integer,String> messageMap = new HashMap<Integer, String>();

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";

     * 对,你猜得一点都没有错,我是复制的
     * @throws IOException
    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");  //服务器地址
        factory.setUsername("guest");  //默认用户名
        factory.setPassword("guest"); //默认密码
        factory.setPort(5672); //默认端口,对就是这么屌全部默认的

        this.connection = factory.newConnection();  //创建链接

        this.channel = this.connection.createChannel();


    public void createAndBindQueue() throws IOException {

         * 创建了一个交换器,类型为 direct 非持久化 自动删除  没有额外参数
        this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也创建一下交换器,反正已经创建也没有关系

         * 创建了一个队列, 名称为 QUEUE_A  非持久化 非独有的 自动删除的 没有额外删除的



    public static void main(String args[]) throws IOException {

        final Consumer consumer = new Consumer();




        new Thread(new Runnable() {
            public void run() {

                try {

                     * 订阅消息,订阅队列QUEUE_A  获得消息后自动确认
                    consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {

                        public void handleConsumeOk(String consumerTag) {


                        public void handleCancelOk(String consumerTag) {


                        public void handleCancel(String consumerTag) throws IOException {


                        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {


                        public void handleRecoverOk(String consumerTag) {


                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                            System.out.println("message : " + SerializationUtils.deserialize(body));

                } catch (IOException e) {




message : message_A ! 发送成功
message : message_B ! 发送成功
message : message_C ! 发送成功
message : message_D ! 发送成功
message : message_E ! 发送成功
message : message_F ! 发送成功


message : message_A
message : message_B
message : message_C
message : message_D
message : message_E
message : message_F

项目我是用maven创建的贴一下maven 的pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">










消息“黑洞”,如果在没有绑定队列和交换器之前,所有发出的消息都无法匹配到相应的队列当中,那些消息将永远不会被消费。而且confirm的callback也是会返回成功的即使消息进入了消息“黑洞”。所以在发送消息之前 必须确定队列已经绑定,确保消息能分配到相应的队列当中。 测试很简单,上面的DEMO 先运行 Producer 显示所有消息发送成功,然后再运行 Consumer 发现没有消息可以接收。 再尝试先运行Consumer 再运行 Producer 就发现一切都正常了,这也是为什么我把autoDelete设置为true的原因,有了autoDelete当队列没有人用的时候就会自动删除。所以每次运行都可以测试出问题。要保证消息能够到达指定的队列最好也在Producer中建立队列 而且进行相关的绑定 然后再发送消息 修改一下 Producer 的其中一方法即可。 不写了 累了~

public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //启用消息确认已经投递成功的回调

         * 创建了一个交换器,类型为 direct 非持久化 自动删除  没有额外参数
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {

             * 成功的时候回调【这个是当消息到达交换器的时候回调】
             * @param deliveryTag   每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
             * @param multiple
             * @throws IOException
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 发送成功");

                //最后一个消息都搞掂之后 关闭所有东西
                if (deliveryTag >= maxID) {


             * 失败的时候回调
             * @param deliveryTag   每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。
             * @param multiple
             * @throws IOException
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 发送失败");
                messageMap.remove(message); //发送失败就不重发了,发脾气

                //最后一个消息都搞掂之后 关闭所有东西
                if (deliveryTag >= maxID) {


         * 创建了一个队列, 名称为 QUEUE_A  非持久化 非独有的 自动删除的 没有额外删除的



