Talk about RocketMQ master-slave replication

2023.07.05

Talk about RocketMQ master-slave replication


After the Master is started, the AcceptSocketService service is created, which is used to create a TCP connection from the client to the server.

RocketMQ master-slave replication is one of RocketMQ's high-availability mechanisms. Data can be replicated from the master node to one or more slave nodes.

In this article, let's talk about RocketMQ's master-slave replication. I hope that after reading it, you can understand the essence of master-slave replication.

picturepicture

1. Synchronous and asynchronous

In the cluster mode of RocketMQ, Broker is divided into Master and Slave. One Master can correspond to multiple Slaves, but one Slave can only correspond to one Master.

Each Broker establishes long connections with all nodes in the Name Server cluster, and regularly registers Topic information to all Name Servers.

picturepicture

The Master node is responsible for receiving the client's write request and persisting the message to the disk. The Slave node is responsible for copying message data from the Master node and maintaining synchronization with the Master node.

1. Synchronous replication

picturepicture

Each Master configures a Slave, and there are multiple pairs of Master-Slave. HA adopts a synchronous double-write method, that is, only when both the master and the backup are successfully written, can the success be returned to the application.

The advantages and disadvantages of this mode are as follows:

  • Advantages: There is no single point of failure for both data and services. When the Master is down, there is no delay in messages, and the service availability and data availability are very high;
  • Disadvantages: The performance is slightly lower than the asynchronous replication mode (about 10% lower), and the RT of sending a single message will be slightly higher. In the current version, after the master node goes down, the backup machine cannot automatically switch to the master machine.

2. Asynchronous replication

picturepicture

Each Master configures a Slave, there are multiple pairs of Master-Slave, HA adopts asynchronous replication, and the master and backup have a short message delay (millisecond level). The advantages and disadvantages of this mode are as follows:

  • Advantages: Even if the disk is damaged, the message loss is very small, and the real-time performance of the message will not be affected. At the same time, after the Master is down, the consumer can still consume from the Slave, and this process is transparent to the application, no manual intervention is required, and the performance is the same Master mode is almost the same;
  • Disadvantages: Master is down, and a small amount of messages will be lost when the disk is damaged.

The replication process is divided into two parts: metadata replication and message data replication.

  • Master-slave server synchronization topic, consumer progress, delayed consumption progress, consumer configuration data
  • Master-slave server synchronization message data

2. Metadata replication

The Slave Broker scheduled task will synchronize metadata every 10 seconds, including topics, consumption progress, delayed consumption progress, and consumer configuration.

picturepicture

When synchronizing topics, Slave Broker sends an RPC request to Master Broker. After returning the data, it is first added to the local cache and then persisted locally.

picturepicture

3. Message data replication

The figure below is the flow chart of Master and Slave message data synchronization.

picturepicture

1. Master monitors the specified port after startup;

After the Master is started, the AcceptSocketService service is created to create a TCP connection from the client to the server.

picturepicture

RocketMQ abstracts the connection object HAConnection, and HAConnection will start two threads for reading and writing services:

  • Read service: process the request sent by Slave
  • Write service: used to transfer data to Slave

picturepicture

2. After the Slave starts, try to connect to the Master and establish a TCP connection;

HAClient is the core class of client Slave, which is responsible for establishing connection and data interaction with Master.

picturepicture

After the client starts, it first tries to connect to the Master, queries the maximum physical offset in the current message store, and stores it in the variable currentReportedOffset.

3. Slave reports the pull message offset to Master;

picturepicture

The data format for reporting progress is a Long type Offset, 8 bytes, very concise.

picturepicture

After sending to the Socket buffer, modify the last write time lastWriteTimestamp.

4. Master parses the request offset, and retrieves all messages after the offset from the message file;

When the Slave reports data to the Master, the SelectionKey.OP_READ event is triggered, and the Master hands over the request to the ReadSocketService service for processing:

picturepicture

When the Slave Broker passes the maxPhyOffset of its own commitlog, the Master will immediately interrupt selector.select(1000) and execute the processReadEvent method.

picturepicture

The core logic of the processReadEvent method is to set the current progress offset of the Slave, and then notify the copy thread of the current copy progress.

The write service WriteSocketService retrieves all messages after the offset from the message file (transmission batch data size limit), and sends the message data to the Slave.

picturepicture

5. Slave receives the data and appends the message data to the message file commitlog.

picturepicture

First call the dispatchReadRequest method in the HAClient class to parse out the message data;

picturepicture

Then append the message data to the local message store.

picturepicture

Fourth, the realization of synchronization

From the flow chart of data replication, we find that data replication itself is executed asynchronously, but how is synchronization achieved?

After the Master Broker receives the request to write the message, it calls Commitlog's aysncPutMessage method to write the message.

picturepicture

In this code, after the commitLog executes appendMessage, it needs to perform two tasks of disk brushing and synchronous replication.

But these two tasks are not executed synchronously, but asynchronously, using CompletableFuture, an asynchronous artifact.

When the HAConnection read service receives the progress feedback from the Slave and finds that the message data has been copied successfully, it wakes up the future.

picturepicture

Finally Broker assembles the response command and returns the response command to the client.

V. Summary

The implementation idea of ​​RocketMQ master-slave replication is very simple. Slave starts a thread, continuously pulls the data in the Commit Log from the Master, and then builds the Consume Queue data structure asynchronously.

The core points are as follows:

1. Master-slave replication includes two parts: metadata replication and message data replication;

2. Metadata replication

The Slave Broker scheduled task sends an RPC request to the Master Broker every 10 seconds, synchronizes the metadata to the cache, and then persists it to the disk;

3. Message data copy

  1. Master starts listening on the specified port
  2. Slave starts the HaClient service and creates a TCP connection with the Master
  3. Slave reports storage progress to Master
  4. Master receives progress, retrieves all messages after the offset in the message file, and transmits them to Slave
  5. After Slave receives the data, it appends the message data to the local message storage.

4. Realization of synchronization

After the commitLog executes appendMessage, it needs to perform two tasks of flushing the disk and synchronous replication, and the asynchronous artifact CompletableFuture is used here. 

When the HAConnection read service receives the progress feedback from the Slave and finds that the message data has been copied successfully, it wakes up the future. Finally Broker assembles the response command and returns the response command to the client.