RabbitMQ通信模型之路由模型

2023.03.01

RabbitMQ通信模型之路由模型

本文介紹了RabbitMQ 通信模型中的路由模型的使用,通過交換機和路由鍵實現點對點通信,適合於需要點對點通信的場景。

大家好,我是了不起。

今天了不起帶領大家接著學習RabbitMQ,了解RabbitMQ的五大通信模型之一的路由模型;接下來還會有關於RabbitMQ的系列教程,對你有幫助的話記得關注哦~

往期傳送門

RabbitMQ (一)你好世界

​RabbitMQ(二)通信模型之work模型​

​RabbitMQ(三)通信模型之發布訂閱模型​

路由模型

RabbitMQ 提供了五種不同的通信模型,上一篇文章中,簡單的介紹了一下RabbitMQ的發布訂閱模型模型。這篇文章來學習一下RabbitMQ中的路由模型(direct)。

路由模型(direct):路由模式相當於是分佈訂閱模式的升級版,多了一個路由key來約束隊列與交換機的綁定。

在路由模型中,生產者將消息發送到交換機,交換機根據消息的路由鍵將消息轉發到對應的隊列中。每個隊列可以綁定多個路由鍵,每個路由鍵可以綁定到多個隊列中。消費者從隊列中接收消息並處理。當一個路由鍵被多個隊列綁定時,交換機會將消息發送到所有綁定的隊列中。當一個隊列綁定多個路由鍵時,該隊列將能夠接收到所有路由鍵對應的消息。

適用場景

路由模型適用於需要點對點通信的場景,例如:

  1. 系統監控告警通知;
  2. 任務分發;
  3. 用戶私信系統;
  4. 訂單確認通知等。

演示

  1. 生產者
// 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "exchange_direct_1";
    // 定义路由的key,key值是可以随意定义的
    private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
    private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
            } else {
                channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  1. 消費者
// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_direct_1";
    private static final String EXCHANGE_NAME = "exchange_direct_1";
    private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_direct_2";
    private static final String EXCHANGE_NAME = "exchange_direct_1";
    private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  1. 測試
    先啟動2個消費者,再啟動生產者
    可以得到結果是消費者1得到了序號是偶數的消息
    消費者2得到了序號是奇數的消息

小結

本文介紹了RabbitMQ 通信模型中的路由模型的使用,通過交換機和路由鍵實現點對點通信,適合於需要點對點通信的場景。在實際使用過程中,需要注意以下幾點:

  1. 路由鍵必須要與消費者綁定隊列時的路由鍵相同,否則無法接收到消息;
  2. 可以通過多個交換機和路由鍵來實現更靈活的消息路由。

後續了不起還會繼續更新RabbitMQ的系列文章,感興趣的小伙伴持續關注哦~~~