一個註解實作WebSocket 叢集方案,這樣玩才爽!

2024.04.07


例如服務A有兩個實例A1和A2,前端的WebSocket客戶端C透過網關的負載平衡連到了A1,這個時候當A2觸發訊息發送的邏輯,需要將某個訊息傳送給所有的客戶端時,C就接受不到消息

這時候我們很快就能想到一個最簡單的解決方案,就是把A2的訊息轉發給A1,A1再把訊息傳送給C,這樣C就能收到A2發送的訊息了

圖片圖片

用法

接下來讓我們來看看這個函式庫的用法

首先我們需要在啟動類別上新增一個註解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(AServiceApplication.class, args);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

接著我們在需要發送訊息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨實例發送訊息啦

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

是不是很簡單,有沒有覺得比自己整合單體應用的WebSocket還要簡單!

當你的同事還在頭痛要實現手動轉發時你已經透過一個配置註解實現了功能並開始泡茶喝

你的同事一定對你刮目相看啊(又能開始摸魚了)

不知道大家看了之後是不是對具體實現已經有了一些想法呢

接下來我就來講講這個函式庫的實作流程

抽象思路

其實我之前有專門針對WebSocket實現過類似功能的模組,只是當時的一些場景都是基於專案定死的,所以相對來說實現比較簡單,但是過於定制化不好擴展;

有一天在和我的一個前同事聊天的過程中得知,他們在考慮讓設備和服務直連,並且服務要部署成多實例

設備和服務直連無非就是透過TCP這種長連線來實現,可以使用快取來保存連線和服務位址的映射關係來實現點對點轉送的功能需求

聽到這裡,是不是感覺似曾相識?當時就有一道光穿過我的腦瓜子,真相只有一個!這不就和WebSocket在叢集模式下的問題一樣麼

於是我從原來針對WebSocket的思考,變成了對各種長連結的思考,最後我將這個問題抽象化了:長連結的群聚方案

而不管是WebSocket還是TCP都是長連結的一種具體實現

所以我們可以抽象化一個頂級介面Connection,然後實作WebSocketConnection或是TCPConnection

其實從抽象的角度來說不僅僅是長連接,短連接也在我們的抽象範圍之內,只不過類似HTTP等協議並不存在上述的問題,但是並不妨礙你實現一個HTTPConnection用於轉發訊息,所以大家不要被先入為主的思維束縛住了

轉送思路

之前講到,這個函式庫的主要想法就是將訊息轉發給其他的服務實例來達到一個單播或廣播的效果

所以訊息轉發的設計就非常重要了

首先訊息轉發需要憑藉一些支援資料互動的技術手段

例如HTTP,MQ,TCP,WebSocket

說到這裡。 。 。大家是不是。 。 。你TM原來自己就能搞定啊(掀桌)

長連結不就是用來互動數據的嗎,所以完全可以自給自足啊

於是就有一個精妙的想法在我腦中形成:

如果每個服務實例都把自己當作一個客戶端,連結到其他服務呢?

在 WebSocket的場景下,我們將目前服務實例當作一個WebSocket客戶端去連接其他服務實例的WebSocket服務端

TCP的場景下,我們將目前服務實例作為一個TCP的客戶端去連接其他服務實例的TCP服務端

這樣其他服務實例就可以把訊息發到這些偽裝的客戶端上,當服務實例上偽裝的客戶端接收到訊息之後就可以再轉發給自己管理的真正的客戶端

撒花家人們,自閉(自我閉環)了屬於是

所以我們首先需要先讓服務實例之間互相連結上

連結流程

讓我們來看看互相建立連結是怎麼設計的

圖片圖片

我定義了一個ConnectionSubscriber的接口,大家可以理解為我們的服務實例要去訂閱監聽其他服務發送的訊息

同時提供了預設實現,就是基於自身的協定進行連線和訊息的傳送

當然也能夠靈活的支援其他方式,只要自訂一個ConnectionSubscriber就可以了,如果使用MQ的方式就可以實作一個MQConnectionSubscriber或是使用HTTP就可以實作一個HTTPConnectionSubscriber

只不過使用自身的協議就可以不用依賴其他的庫或是中間件了,當然如果你對訊息的丟失率有比較嚴格的要求也可以使用MQ作為訊息轉發的中介,而以我之前參與過的項目來說,一般普通的WebSocket場景基本上還是能忍受一定的丟失率的

取得服務實例信息

那我們怎麼知道要去連接哪些實例呢

我定義了一個ConnectionServerManager的介面用來管理服務訊息

當然我們完全可以自己實作一個,例如透過設定檔來設定服務實例資訊

不過我們有更方便的方式,那就是依賴Spring Cloud的服務發現元件了,不管是Eureka還是Nacos還是其他的註冊中心相當於都支持了,這就是抽象的魅力啊

我們可以透過DiscoveryClient##getInstances(Registration.getServiceId())來取得所有的實例,排除掉自身就是需要連線的服務實例了

當我們的服務實例連接上其他的服務實例之後,發送一個自身實例資訊的訊息過去,其他的服務實例接收到對應的訊息之後反過來連接我們的服務實例,保證一定的連接及時性,這樣雙方的連結就搭建起來了,可以互相轉發訊息了

同時我還添加了心跳檢測和自動重連,當一段時間沒有收到心跳回復後就會斷開連接,並且每隔一段時間就會重新查詢一遍實例信息,如果發現存在某個服務實例沒有對應的連接,就會重新進行連接,這樣就能在某些偶爾網絡不好的情況下有一定的容錯

到目前為止,我們基本的框架已經建立了,當我們啟動服務之後,服務間就會自動建立連接

連結區分與管理

基於上述的思路,我們肯定需要區分真實的客戶端和用來轉發的客戶端

於是我就把這些連結做了一個分類

類別

說明

客戶

普通的連接

訂戶

服務實例偽裝的連接,用於接受需要轉發的訊息

可觀察的

服務實例偽裝的連接,用於發送需要轉發的訊息

然後對於這些連接進行一個統一的管理

圖片圖片

透過連接工廠ConnectionFactory我們可以將任意的連接適配成Connection對象,並實現各種連接間的消息轉發

每個連線都會配置一個MessageEncoder和MessageDecoder用於訊息的編碼和解碼,而且不同類別的連接對應的編碼器和解碼器肯定是不一樣的,例如轉發的訊息和發給真實客戶端的訊息很大程度上都是有差別的,所以額外定義了一個MessageCodecAdapter用來適應不同類型的編解碼器,也能讓大家在自訂時方便管理

訊息發送

現在當我們發送某個訊息之後,訊息就會被轉發到其他的服務實例,所有的客戶端就都能收到了

不對啊,在有些情況下我們不想讓所有客戶端都收到啊,能不能我們想讓誰收到就讓誰收到啊

真麻煩,來,我把所有的連結都給你,你自己選吧

連接選擇

我們需要在訊息發送時確定發送給哪些連接

圖片圖片

於是我就定義了一個連線選擇器ConnectionSelector

每次要發送訊息的時候,我都會匹配一個連接選擇器,然後透過選擇器來獲得需要發送訊息的連接,而我們可以透過自訂連接選擇器來實現我們訊息的精準發送

這裡其實就是我為什麼會取名WebSocketLoadBalanceConcept的原因,為什麼要叫LoadBalance呢

Ribbon`通过`IRule`来选择一个`Server
  • 1.

我透過ConnectionSelector來選擇一個Connection集合

是不是有異曲同工之妙

繼續來說自訂選擇器

準備工作:

  • 我們的Connection有一個metadata欄位用於存放自訂屬性
  • 我們的Message有一個headers欄位用於存放訊息頭

給指定用戶發送訊息

很多場景下我們需要給指定的用戶發送訊息

首先當客戶端連接上來時,可以透過參數或主動發送一個訊息將userId發給服務端,然後服務端將得到的userId存在Connection的metadata中

接著我們為需要傳送的Message新增一個header,將對應的userId當作訊息頭

這樣我們就可以自訂一個連接選擇器透過判斷Message是否包含userId訊息標頭來作為匹配的條件,當Message的headers中存在userId時,對Connection中的metadata進行userId的匹配來篩選需要發送訊息的連接

由於userId是唯一的,當我們自身服務連上來的客戶端中已經匹配到就不需要再轉發了,如果沒有匹配到就透過其他服務實例的客戶端進行訊息轉發

庫中已經實作了對應的UserSelector和UserMessage,可以使用組態開啟並透過在連線路徑上新增userId參數來標記用戶

當然我們也可以藉用快取來精確的判斷需不需要轉發或是需要轉發給哪幾個服務,把userId和服務的instanceId等一些具有唯一性的資料緩存在Redis中,當給用戶發送訊息時,從Redis中取得使用者對應的服務實例的instanceId或是具有唯一性的數據,如果經過匹配就是當前服務就可以直接下發,如果是其他服務就轉發給那個對應的服務就行了

給指定路徑發送訊息

還有一種場景也比較常見就是類似主題訂閱,如訂閱設備狀態更新的數據,就要給每個對應路徑的連接發送訊息了

我們可以使用不同的路徑來表示不同主題,然後自訂一個連接選擇器來匹配連接的路徑和訊息標頭中指定的路徑

當然在庫中也已經實現了對應的PathSelector和PathMessage,可以透過設定開啟

總結

最後請容許我發表一點對於抽象的拙見

抽像其實就和「道生一,一生二,二生三,三生萬物」 一樣,根據你的頂級介面(也就是核心功能)不斷的向外展開,你的頂級介面就是道(狹義的來講)

以這個函式庫為例,ConnectionLoadBalanceConcept就是這個函式庫的道,他的核心功能就是傳送訊息,至於怎麼發,發給誰,不確定,像是一個混沌的狀態

那麼什麼是一,二,三呢,我們發送訊息需要載體於是就有了Connection和Message,我們需要對Connection進行管理於是就有了ConnectionRepository, 我們需要轉發訊息於是就有了ConnectionSubscriber等等

而萬物就像是具體的實現,是能落實的,基於Spring Cloud服務發現的連接管理器DiscoveryConnectionServerManager,基於路徑的連接選擇器PathSelector,基於Reactive的WebSocket連接ReactiveWebSocketConnection

就像是你創造的世界,不斷的衍生出各種各樣的規則,這些規則相輔相成,讓你的世界平穩的運行