RabbitMQ之通信模型之發布訂閱模型

2023.01.12

RabbitMQ之通信模型之發布訂閱模型

本文到這裡就結束了,介紹了RabbitMQ通信模型中的發布訂閱,適合於做模塊之間的異步通信。

大家好,我是指北君。

今天指北君帶領大家接著學習RabbitMQ,了解RabbitMQ的五大通信模型之一的發布訂閱模型;接下來還會有關於RabbitMQ的系列教程,對你有幫助的話記得關注哦~

發布訂閱模型

上一篇文章中,簡單的介紹了一下RabbitMQ的work模型。這篇文章來學習一下RabbitMQ中的發布訂閱模型。

發布訂閱模型(Publish/Subscribe):簡單的說就是隊列裡面的消息會被多個消費者同時接受到,消費者接收到的信息一致。

發布訂閱模型適合於做模塊之間的異步通信。

圖片

適用場景

  1. 發送並記錄日誌信息
  2. springcloud的config組件裡面通知配置自動更新
  3. 緩存同步
  4. 微信訂閱號

演示

生產者

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 发送消息到交换机
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("发布订阅模型的第 " + i + " 条消息").getBytes());
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

消費者

// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_publish_1";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_publish_2";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.

測試

先啟動2個消費者,再啟動生產者

圖片

圖片

​可以看出來消費者1和消費者2接收到的消息是一模一樣的,每個消費者都收到了生產者發送的消息;

發布訂閱模型,用到了一個新的東西-交換機,這裡也解釋一下相關方法的參數:

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 该方法的最多参数的重载方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
                                    BuiltinExchangeType type,
                                    boolean durable,
                                    boolean autoDelete,
                                    boolean internal,
                                    Map<String, Object> arguments) throws IOException;

/**
 *  param1:exchange,交换机名称
 *  param2:type,交换机类型;直接写 string效果一致;内置了4种交换机类型:
 *   direct(路由模式)、fanout(发布订阅模式)、
 *   topic(topic模式-模糊匹配)、headers(标头交换,由Headers的参数分配,不常用)
 *  param3:durable,是否持久化交换机   false:默认值,不持久化
 *  param4:autoDelete,没有消费者使用时,是否自动删除交换机   false:默认值,不删除
 *  param5:internal,是否内置,如果设置 为true,则表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器的方式  false:默认值,允许外部直接访问
 *  param6:arguments,交换机的一些其他属性,默认值为 null
 */
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
 *  param1:destination,目的地,队列的名字
 *  param2:source,资源,交换机的名字
 *  param3:routingKey,路由键(目前没有用到routingKey,填 "" 即可)
 */
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

小結

本文到這裡就結束了,介紹了RabbitMQ通信模型中的發布訂閱模型,適合於做模塊之間的異步通信。