An annotation implements the WebSocket cluster solution, which is fun to play!

2024.04.07

Everyone should be familiar with WebSocket. If it is a single application, there will indeed be no problem, but when our project uses a microservice architecture, there may be problems.

For example, service A has two instances A1 and A2. The front-end WebSocket client C is connected to A1 through the load balancing of the gateway. At this time, when A2 triggers the message sending logic and needs to send a message to all clients, C I can't receive the news

At this time, we can quickly think of the simplest solution, which is to forward the message from A2 to A1, and then A1 sends the message to C, so that C can receive the message sent by A2.

picturepicture

usage

Next let us look at the usage of this library

First we need to add an annotation @EnableWebSocketLoadBalanceConcept to the startup class

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(AServiceApplication.class, args);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

Then we inject WebSocketLoadBalanceConcept where we need to send messages, and we can happily send messages across instances.

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

Is it very simple? Do you think it is simpler than integrating WebSocket of a single application yourself?

When your colleagues are still having a headache to implement manual forwarding, you have already implemented the function through a configuration annotation and started making tea.

Your colleagues must be impressed by you (you can start fishing again)

I don’t know if you have some ideas about the specific implementation after reading it.

Next, I will talk about the implementation process of this library.

abstract ideas

In fact, I have implemented a module with similar functions specifically for WebSocket before, but some of the scenarios at that time were based on the project, so the implementation was relatively simple, but it was too customized and difficult to expand;

One day while chatting with a former colleague of mine, I learned that they were considering connecting devices and services directly and deploying services into multiple instances.

Direct connection between devices and services is nothing more than long-lasting connections such as TCP. Caching can be used to save the mapping relationship between connections and service addresses to achieve point-to-point forwarding functional requirements.

When you hear this, do you feel familiar? At that time, a ray of light passed through my mind, and there was only one truth! Isn't this the same problem as WebSocket in cluster mode?

So I changed from thinking about WebSocket to thinking about various long connections. Finally, I abstracted this problem into: a cluster solution for long connections.

Both WebSocket and TCP are specific implementations of long connections.

So we can abstract a top-level interface Connection, and then implement WebSocketConnection or TCPConnection

In fact, from an abstract point of view, not only long connections, short connections are also within our abstract scope. However, protocols such as HTTP do not have the above problems, but this does not prevent you from implementing an HTTPConnection for forwarding messages. So don’t be bound by preconceived ideas.

Forward ideas

As mentioned before, the main idea of ​​this library is to forward messages to other service instances to achieve a unicast or broadcast effect.

Therefore, the design of message forwarding is very important.

First of all, message forwarding requires some technical means to support data interaction.

Such as HTTP, MQ, TCP, WebSocket

Speaking of which. . . Everyone is not. . . It turns out you can do it yourself (turns the table)

Aren’t long connections used for exchanging data, so they are completely self-sufficient?

Then a wonderful idea formed in my mind:

What if each service instance considers itself a client and connects to other services?

In the WebSocket scenario, we use the current service instance as a WebSocket client to connect to the WebSocket servers of other service instances.

In the TCP scenario, we use the current service instance as a TCP client to connect to the TCP servers of other service instances.

In this way, other service instances can send messages to these disguised clients. When the disguised client on the service instance receives the message, it can forward it to the real client managed by itself.

Members of the Sahua family, autistic (self-closed loop) belongs to

So we first need to connect the service instances to each other

Connection process

Let’s take a look at how connections are designed

picturepicture

I have defined a ConnectionSubscriber interface. You can understand that our service instance needs to subscribe and listen to messages sent by other services.

At the same time, a default implementation is provided, which is to connect and send messages based on its own protocol.

Of course, other methods can also be flexibly supported. You only need to customize a ConnectionSubscriber. If you use MQ, you can implement an MQConnectionSubscriber or use HTTP to implement an HTTPConnectionSubscriber.

But using its own protocol, you don’t need to rely on other libraries or middleware. Of course, if you have strict requirements on the message loss rate, you can also use MQ as an intermediary for message forwarding. Based on the projects I have participated in before Generally speaking, ordinary WebSocket scenarios can basically tolerate a certain loss rate.

Get service instance information

So how do we know which instances to connect to?

I defined a ConnectionServerManager interface to manage service information

Of course, we can implement one ourselves, such as configuring service instance information through configuration files.

But we have a more convenient way, which is to rely on the service discovery component of Spring Cloud. Whether it is Eureka, Nacos or other registration centers, they all support it. This is the charm of abstraction.

We can obtain all instances through DiscoveryClient##getInstances(Registration.getServiceId()), excluding the service instances that need to be connected.

When our service instance connects to other service instances, it sends a message with its own instance information. After receiving the corresponding message, the other service instances in turn connect to our service instance to ensure a certain connection timeliness, so that both parties The connection is established and messages can be forwarded to each other.

At the same time, I also added heartbeat detection and automatic reconnection. When no heartbeat reply is received for a period of time, the connection will be disconnected, and the instance information will be re-queried every once in a while. If it is found that there is no corresponding service instance connection, it will be reconnected, so that there is a certain degree of fault tolerance in some occasional situations where the network is not good.

So far, our basic framework has been established. When we start the service, a connection will be automatically established between the services.

Connection differentiation and management

Based on the above ideas, we definitely need to distinguish between the real client and the client used for forwarding

So I classified these connections

category

illustrate

Client

ordinary connection

Subscriber

The connection disguised by the service instance is used to accept messages that need to be forwarded.

Observable

The connection disguised by the service instance is used to send messages that need to be forwarded.

Then perform unified management of these connections

picturepicture

Through the connection factory ConnectionFactory, we can adapt any connection into a Connection object and implement message forwarding between various connections.

Each connection is configured with a MessageEncoder and MessageDecoder for message encoding and decoding, and the encoders and decoders corresponding to different types of connections are definitely different. For example, forwarded messages and messages sent to real clients are largely different. They are all different, so an additional MessageCodecAdapter is defined to adapt to different types of codecs, and also allows everyone to easily manage when customizing.

message sending

Now when we send a message, the message will be forwarded to other service instances, and all clients can receive it.

No, in some cases we don’t want all clients to receive it. Can we let whoever we want to receive it receive it?

It's really troublesome. Come on, I'll give you all the connections. You can make your own choice.

Connection selection

We need to determine which connections to send to when the message is sent

picturepicture

So I defined a connection selector ConnectionSelector

Every time I want to send a message, I will match a connection selector, and then use the selector to get the connection that needs to send the message, and we can achieve precise sending of our messages by customizing the connection selector.

This is actually the reason why I named WebSocketLoadBalanceConcept, and why I named it LoadBalance.

Ribbon`通过`IRule`来选择一个`Server
  • 1.

I select a Connection collection through ConnectionSelector

Is there any difference but similar approach?

Let’s move on to custom selectors

Preparation:

  • Our Connection has a metadata field for storing custom attributes
  • Our Message has a headers field for storing message headers

Send message to specified user

In many scenarios we need to send messages to specified users

First, when the client connects, it can send the userId to the server through parameters or actively send a message, and then the server will store the obtained userId in the metadata of the Connection.

Then we add a header to the Message that needs to be sent, and use the corresponding userId as the message header.

In this way, we can customize a connection selector to determine whether the Message contains the userId header as a matching condition. When userId exists in the headers of the Message, the metadata in the Connection will be matched by userId to filter the connections that need to send messages.

Since the userId is unique, when the client connected to our own service has been matched, there is no need to forward the message. If there is no match, the message will be forwarded through the clients of other service instances.

The corresponding UserSelector and UserMessage have been implemented in the library, which can be turned on using configuration and marked users by adding the userId parameter to the connection path.

Of course, we can also use caching to accurately determine whether it needs to be forwarded or which services need to be forwarded, and cache some unique data such as userId and instanceId of the service in Redis. When sending a message to the user, from Redis obtains the instanceId or unique data of the service instance corresponding to the user. If it is the current service after matching, it can be sent directly. If it is another service, it can be forwarded to the corresponding service.

Send a message to the specified path

Another common scenario is similar topic subscription. For example, if you subscribe to device status update data, you need to send a message to each connection of the corresponding path.

We can use different paths to represent different topics, and then customize a connection selector to match the connection path and the path specified in the message header.

Of course, the corresponding PathSelector and PathMessage have also been implemented in the library, which can be enabled through configuration.

Summarize

Finally, please allow me to express some humble opinions on abstraction.

Abstraction is actually the same as "Tao gives birth to one, one gives birth to two, two gives birth to three, and three gives birth to all things." According to your top-level interface (that is, core function), it continues to expand outwards. Your top-level interface is Tao (in a narrow sense)

Take this library as an example. ConnectionLoadBalanceConcept is the path of this library. Its core function is to send messages. As for how to send it and who to send it to, it is not sure. It is like a chaotic state.

So what are one, two and three? We need a carrier to send messages, so we have Connection and Message. We need to manage Connection, so we have ConnectionRepository. We need to forward messages, so we have ConnectionSubscriber, etc.

Everything is like a specific implementation, which can be implemented. The connection manager DiscoveryConnectionServerManager based on Spring Cloud service discovery, the path-based connection selector PathSelector, and the Reactive WebSocket connection based on ReactiveWebSocketConnection

Just like the world you create, various rules are constantly derived. These rules complement each other and make your world run smoothly.