RabbitMQ之通信模型之Work模型
2023.01.09
RabbitMQ之通信模型之Work模型
本文到這裡就結束了,主要介紹了RabbitMQ通信模型中的work模型,適用於限流、削峰等應用場景。
大家好,我是指北君。
今天指北君帶領大家接著學習RabbitMQ,了解RabbitMQ的五大通信模型之一的Work模型;接下來還會有關於RabbitMQ的系列教程,對你有幫助的話記得關注哦~
回顧
上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。
有的小伙伴留言說看不懂其中的方法參數,這裡先解釋一下幾個基本的方法參數。
// 声明队列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* param1:queue 队列的名字
* param2:durable 是否持久化;比如现在发送到队列里面的消息,如果没有持久化,重启这个队列后数 据会丢失(false) true:重启之后数据依然在
* param3:exclusive 是否排外(是否是当前连接的专属队列),排外的意思是:
* 1:连接关闭之后 这个队列是否自动删除(false:不自动删除)
* 2:是否允许其他通道来进行访问这个数据(false:不允许)
* param4:autoDelete 是否自动删除
* 就是当最后一个连接断开的时候,是否自动删除这个队列(false:不删除)
* param5:arguments(map) 声明队列的时候,附带的一些参数
*/
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
// 发送数据到队列
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一个队列消息...".getBytes());
/**
* param1:exchange 交换机 没有就设置为 "" 值就可以了
* param2:routingKey 路由的key 现在没有设置key,直接使用队列的名字
* param3:BasicProperties 发送数据到队列的时候,是否要带一些参数。
* MessageProperties.PERSISTENT_TEXT_PLAIN表示没有带任何参数
* param4:body 向队列中发送的消息数据
*/
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
Work模型
work模型稱為工作隊列或者競爭消費者模式,多個消費者消費的數據之和才是原來隊列中的所有數據,適用於流量的削峰。
演示
寫個簡單的測試:
生產者
public class Producer {
private static final String QUEUE_NAME = "queue_work_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes());
}
channel.close();
connection.close();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
消費者
// 消费者1
public class Consumer {
private static final String QUEUE_NAME = "queue_work_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.basicQos(0, 1, false);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(System.currentTimeMillis() + "消费者1接收到信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
// 消费者2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_work_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.basicQos(0, 1, false);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(System.currentTimeMillis() + "消费者2接收到信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
// 这里加了个延迟,表示处理业务时间
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
結果
可以看出來:100條消息,消費者之間是平分的,消費者1 幾乎是瞬間完成,消費者2 則是慢慢吞吞的運行完畢,消費者1大量時間處於空閒狀態,消費者2則一直忙碌。這顯然是不適用於實際開發中。
我們需要遵從一個原則,就是 能者多勞 ,消費越快的人,消費的越多;
現在我們把消費者1和2的代碼中 // channel.basicQos(0, 1, false); 這行代碼取消註釋,再次運行;
現在的結果就比較符合能者多勞,雖然你幹的多,但是工資是一樣的呀~
work模型的一個主要的方法是basicQos();這裡也解釋一下其參數:
// 设置限流机制
channel.basicQos(0, 1, false);
/**
* param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
* param2: prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
* param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
*/
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
小結
本文到這裡就結束了,主要介紹了RabbitMQ通信模型中的work模型,適用於限流、削峰等應用場景。