聊聊RocketMQ 主從復制

聊聊RocketMQ 主從復制


Master 啟動後創建AcceptSocketService 服務, 用來創建客戶端到服務端的TCP 鏈接。

RocketMQ 主從復制是RocketMQ 高可用機制之一,數據可以從主節點複製到一個或多個從節點。

這篇文章,我們聊聊RocketMQ 的主從復制,希望大家讀完之後,能夠理解主從復制的精髓。

圖片圖片

一、同步與異步

在RocketMQ 的集群模式中,Broker 分為Master 與Slave,一個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master。

每個Broker 與Name Server 集群中的所有節點建立長連接,定時註冊Topic 信息到所有Name Server。

圖片圖片

Master 節點負責接收客戶端的寫入請求,並將消息持久化到磁盤上。而Slave 節點則負責從Master 節點複製消息數據,並保持與Master 節點的同步。

1、同步複製

圖片圖片

每個Master 配置一個Slave ,有多對Master-Slave ,HA 採用同步雙寫方式,即只有主備都寫成功,才向應用返回成功。

這種模式的優缺點如下:

  • 優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
  • 缺點:性能比異步複製模式略低(大約低10%左右),發送單個消息的RT 會略高,且目前版本在主節點宕機後,備機不能自動切換為主機。

2、異步複製

圖片圖片

每個Master 配置一個Slave ,有多對Master-Slave ,HA 採用異步複製方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:

  • 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機後,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master 模式幾乎一樣;
  • 缺點:Master 宕機,磁盤損壞情況下會丟失少量消息。

複製流程分為兩個部分:元數據複製和消息數據複製。

  • 主從服務器同步主題,消費者進度,延遲消費進度,消費者配置數據
  • 主從服務器同步消息數據

二、元數據複製

Slave Broker 定時任務每隔10 秒會同步元數據,包括主題,消費進度,延遲消費進度,消費者配置。

圖片圖片

同步主題時, Slave Broker 向Master Broker 發送RPC 請求,返回數據後,首先加入本地緩存裡,然後持久化到本地。

圖片圖片

三、消息數據複製

下圖是Master 和Slave 消息數據同步的流程圖。

圖片圖片

1、Master 啟動後監聽指定端口;

Master 啟動後創建AcceptSocketService 服務 , 用來創建客戶端到服務端的TCP 鏈接。

圖片圖片

RocketMQ 抽象了鏈接對象HAConnection , HAConnection 會啟動兩個線程,分別用於讀服務和寫服務:

  • 讀服務:處理Slave 發送的請求
  • 寫服務:用於向Slave 傳輸數據

圖片圖片

2、Slave 啟動後,嘗試連接Master ,建立TCP 連接;

HAClient 是客戶端Slave 的核心類,負責和Master 創建連接和數據交互。

圖片圖片

客戶端在啟動後,首先嘗試連接Master , 查詢當前消息存儲中最大的物理偏移量,並存儲在變量currentReportedOffset 裡。

3、Slave 向Master 匯報拉取消息偏移量;

圖片圖片

上報進度的數據格式是一個Long 類型的Offset , 8個字節, 非常簡潔。

圖片圖片

發送到Socket 緩衝區後, 修改最後一次的寫時間lastWriteTimestamp 。

4、Master 解析請求偏移量,從消息文件中檢索該偏移量後的所有消息;

當Slave 上報數據到Master 時,觸發SelectionKey.OP_READ 事件,Master 將請求交由ReadSocketService 服務處理:

圖片圖片

當Slave Broker 傳遞了自身commitlog 的maxPhyOffset 時,Master 會馬上中斷 selector.select(1000) ,執行 processReadEvent 方法。

圖片圖片

processReadEvent 方法的核心邏輯是設置Slave 的當前進度offset ,然後通知複製線程當前的複制進度。

寫服務WriteSocketService 從消息文件中檢索該偏移量後的所有消息(傳輸批次數據大小限制),並將消息數據發送給Slave。

圖片圖片

5、Slave 接收到數據,將消息數據append 到消息文件commitlog 裡。

圖片圖片

首先HAClient 類中調用dispatchReadRequest 方法, 解析出消息數據;

圖片圖片

然後將消息數據append 到本地的消息存儲。

圖片圖片

四、 同步的實現

從數據複製流程圖,我們發覺數據複製本身就是一個異步執行的,但是同步是如何實現的呢?

Master Broker 接收到寫入消息的請求後,調用Commitlog 的aysncPutMessage 方法寫入消息。

圖片圖片

這段代碼中,當commitLog 執行完appendMessage 後, 需要執行刷盤任務和同步複製兩個任務。

但這兩個任務並不是同步執行,而是異步的方式,使用了CompletableFuture 這個異步神器。

當HAConnection 讀服務接收到Slave 的進度反饋,發現消息數據複製成功,則喚醒future 。

圖片圖片

最後Broker 組裝響應命令,並將響應命令返回給客戶端。

五、總結

RocketMQ 主從復制的實現思路非常簡潔,Slave 啟動一個線程,不斷從Master 拉取Commit Log 中的數據,然後在異步build 出Consume Queue 數據結構。

核心要點如下:

1、主從復制包含元數據複製和消息數據複製兩個部分;

2、元數據複製

Slave Broker 定時任務每隔10 秒向Master Broker 發送RPC 請求,將元數據同步到緩存後,然後持久化到磁盤裡;

3、消息數據複製

  1. Master 啟動監聽指定端口
  2. Slave 啟動HaClient 服務,和Master 創建TCP 鏈接
  3. Slave 向Master 上報存儲進度
  4. Master 接收進度,消息文件中檢索該偏移量後的所有消息,並傳輸給Slave
  5. Slave 接收到數據後,將消息數據append 到本地的消息存儲。

4、同步的實現

當commitLog 執行完appendMessage 後, 需要執行刷盤任務和同步複製兩個任務,這裡用到了CompletableFuture 這個異步神器。 

當HAConnection 讀服務接收到Slave 的進度反饋,發現消息數據複製成功,則喚醒future 。最後Broker 組裝響應命令,並將響應命令返回給客戶端。