Publish-subscribe model of RabbitMQ communication model

2023.01.12

Publish-subscribe model of RabbitMQ communication model


This article ends here. It introduces the publish and subscribe in the RabbitMQ communication model, which is suitable for asynchronous communication between modules.

Hello everyone, I mean Beijun.

Today, Zhibei Jun will lead you to learn RabbitMQ and understand the publish-subscribe model, one of the five major communication models of RabbitMQ; there will be a series of tutorials about RabbitMQ in the future. Remember to pay attention if it is helpful to you~

Publish Subscribe Model

In the previous article, I briefly introduced the work model of RabbitMQ. This article is to learn about the publish-subscribe model in RabbitMQ.

Publish/Subscribe model (Publish/Subscribe): Simply put, the messages in the queue will be received by multiple consumers at the same time, and the information received by consumers is consistent.

The publish-subscribe model is suitable for asynchronous communication between modules.

picture

Applicable scene

  1. Send and record log information
  2. In the config component of springcloud, the notification configuration is automatically updated
  3. cache synchronization
  4. WeChat subscription number

demo

producer

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 发送消息到交换机
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("发布订阅模型的第 " + i + " 条消息").getBytes());
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

consumer

// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_publish_1";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_publish_2";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.

test

Start 2 consumers first, then start the producer

picture

picture

​It can be seen that the messages received by consumer 1 and consumer 2 are exactly the same, and each consumer has received the message sent by the producer;

The publish-subscribe model uses a new thing - the switch. Here is also an explanation of the parameters of the related methods:

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 该方法的最多参数的重载方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
                                    BuiltinExchangeType type,
                                    boolean durable,
                                    boolean autoDelete,
                                    boolean internal,
                                    Map<String, Object> arguments) throws IOException;

/**
 *  param1:exchange,交换机名称
 *  param2:type,交换机类型;直接写 string效果一致;内置了4种交换机类型:
 *   direct(路由模式)、fanout(发布订阅模式)、
 *   topic(topic模式-模糊匹配)、headers(标头交换,由Headers的参数分配,不常用)
 *  param3:durable,是否持久化交换机   false:默认值,不持久化
 *  param4:autoDelete,没有消费者使用时,是否自动删除交换机   false:默认值,不删除
 *  param5:internal,是否内置,如果设置 为true,则表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器的方式  false:默认值,允许外部直接访问
 *  param6:arguments,交换机的一些其他属性,默认值为 null
 */
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
 *  param1:destination,目的地,队列的名字
 *  param2:source,资源,交换机的名字
 *  param3:routingKey,路由键(目前没有用到routingKey,填 "" 即可)
 */
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

summary

This article ends here. It introduces the publish-subscribe model in the RabbitMQ communication model, which is suitable for asynchronous communication between modules.