Publish-subscribe model of RabbitMQ communication model
Publish-subscribe model of RabbitMQ communication model
Hello everyone, I mean Beijun.
Today, Zhibei Jun will lead you to learn RabbitMQ and understand the publish-subscribe model, one of the five major communication models of RabbitMQ; there will be a series of tutorials about RabbitMQ in the future. Remember to pay attention if it is helpful to you~
Publish Subscribe Model
In the previous article, I briefly introduced the work model of RabbitMQ. This article is to learn about the publish-subscribe model in RabbitMQ.
Publish/Subscribe model (Publish/Subscribe): Simply put, the messages in the queue will be received by multiple consumers at the same time, and the information received by consumers is consistent.
The publish-subscribe model is suitable for asynchronous communication between modules.
Applicable scene
- Send and record log information
- In the config component of springcloud, the notification configuration is automatically updated
- cache synchronization
- WeChat subscription number
demo
producer
public class Producer {
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发送消息到交换机
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, ("发布订阅模型的第 " + i + " 条消息").getBytes());
}
// 关闭资源
channel.close();
connection.close();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
consumer
// 消费者1
public class Consumer {
private static final String QUEUE_NAME = "queue_publish_1";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("队列1接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
// 消费者2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_publish_2";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("队列2接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
test
Start 2 consumers first, then start the producer
It can be seen that the messages received by consumer 1 and consumer 2 are exactly the same, and each consumer has received the message sent by the producer;
The publish-subscribe model uses a new thing - the switch. Here is also an explanation of the parameters of the related methods:
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 该方法的最多参数的重载方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
/**
* param1:exchange,交换机名称
* param2:type,交换机类型;直接写 string效果一致;内置了4种交换机类型:
* direct(路由模式)、fanout(发布订阅模式)、
* topic(topic模式-模糊匹配)、headers(标头交换,由Headers的参数分配,不常用)
* param3:durable,是否持久化交换机 false:默认值,不持久化
* param4:autoDelete,没有消费者使用时,是否自动删除交换机 false:默认值,不删除
* param5:internal,是否内置,如果设置 为true,则表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器的方式 false:默认值,允许外部直接访问
* param6:arguments,交换机的一些其他属性,默认值为 null
*/
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
* param1:destination,目的地,队列的名字
* param2:source,资源,交换机的名字
* param3:routingKey,路由键(目前没有用到routingKey,填 "" 即可)
*/
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
summary
This article ends here. It introduces the publish-subscribe model in the RabbitMQ communication model, which is suitable for asynchronous communication between modules.