Work model of communication model of RabbitMQ

This article ends here. It mainly introduces the work model in the RabbitMQ communication model, which is suitable for application scenarios such as current limiting and peak shaving.

In the previous article, I briefly introduced RabbitMQ, as well as installation and hello world.

Some friends left a message saying that they could not understand the method parameters. Here are a few basic method parameters explained first.

// 声明队列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 * param1:queue 队列的名字
 * param2:durable 是否持久化;比如现在发送到队列里面的消息,如果没有持久化,重启这个队列后数 据会丢失(false) true:重启之后数据依然在
 * param3:exclusive 是否排外(是否是当前连接的专属队列),排外的意思是:
 *            1:连接关闭之后 这个队列是否自动删除(false:不自动删除)
 *            2:是否允许其他通道来进行访问这个数据(false:不允许) 
 * param4:autoDelete 是否自动删除
 *            就是当最后一个连接断开的时候,是否自动删除这个队列(false:不删除)
 * param5:arguments(map) 声明队列的时候,附带的一些参数
Work model






public class Producer {
    private static final String QUEUE_NAME = "queue_work_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes());

// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_work_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // channel.basicQos(0, 1, false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(System.currentTimeMillis() + "消费者1接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

public class Consumer2 {
    private static final String QUEUE_NAME = "queue_work_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // channel.basicQos(0, 1, false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(System.currentTimeMillis() + "消费者2接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
                // 这里加了个延迟,表示处理业务时间
                try {
                } catch (InterruptedException e) {
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
可以看出来:100条消息,消费者之间是平分的,消费者1 几乎是瞬间完成,消费者2 则是慢慢吞吞的运行完毕,消费者1大量时间处于空闲状态,消费者2则一直忙碌。这显然是不适用于实际开发中。

我们需要遵从一个原则,就是 能者多劳 ,消费越快的人,消费的越多;

现在我们把消费者1和2的代码中 // channel.basicQos(0, 1, false); 这行代码取消注释,再次运行;





// 设置限流机制
channel.basicQos(0, 1, false);
 *  param1: prefetchSize,消息本身的大小 如果设置为0  那么表示对消息本身的大小不限制
 *  param2: prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
 *  param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
