I have seven schemes for implementing web real-time message push

I have seven schemes for implementing web real-time message push

Why is the MQTT protocol so favored in the Internet of Things (IOT)? Instead of other protocols, such as the HTTP protocol we are more familiar with?

I have a friend~

I made a small broken station, and now I want to implement a function of pushing web messages within the station. Yes, it is the little red dot in the picture below, a very commonly used function.

picture

However, he hasn't figured out how to do it yet. Here I help him sort out several solutions and simply implement them.

picture

What is push message (push)

There are many push scenarios. For example, when someone follows my official account, I will receive a push message to attract me to click to open the app.

Message push (push) usually refers to the active push of messages to the user's current web page or mobile device APP through a certain tool by the operation staff of the website.

Message push is generally divided into web-side message push​ and mobile-side message push.

picture

The above one belongs to the mobile-side message push, and the web-side message push is common, such as in-site messages, the number of unread emails, the number of monitoring alarms, etc., and it is also widely used.

picture

Before the specific implementation, let's analyze the previous requirements. In fact, the function is very simple. As long as an event is triggered (actively sharing resources or actively pushing messages in the background), the notification red dot on the web page will be +1 in real time. .

Usually, there are several message push tables on the server side to record different types of messages pushed by users triggering different events. The front end actively queries (pulls) or passively receives (pushes) the number of all unread messages of users.

picture

Message push is nothing more than two forms of push (push​) and pull (pull), let’s take a look at them one by one.

short polling

Polling (polling​) should be the easiest way to implement a message push scheme. Here we temporarily divide polling into short polling​ and long polling.

Short polling is easy to understand. At a specified time interval, the browser sends an HTTP request to the server, the server returns unread message data to the client in real time, and the browser renders and displays it.

A simple JS timer can do it, request the unread message count interface every second, and display the returned data.

setInterval(() => {
  // 方法请求
  messageCount().then((res) => {
      if (res.code === 200) {
          this.messageCount = res.data
      }
  })
}, 1000);
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

The effect is still good. Although the implementation of short polling is simple, the shortcomings are also obvious. Since the push data does not change frequently, no matter whether there is a new message in the backend at this time, the client will make a request, which will inevitably cause a lot to the server. A lot of stress, wasting bandwidth and server resources.

picture

long polling

Long polling is an improved version of the above short polling, which can reduce the waste of server resources as much as possible while ensuring the relative real-time performance of messages. Long polling is widely used in middleware, such as Nacos​ and apollo​configuration center, message queue kafka​, RocketMQ all use long polling.

Is the interaction model of the Nacos configuration center push or pull? ​In this article, I introduced the implementation principle of Nacos long polling in detail, and interested friends can take a look.

This time, I used the apollo​configuration center to implement long polling, and applied a class DeferredResult​, which is an asynchronous request mechanism provided by Spring encapsulation after servelet 3.0. The direct meaning is to delay the result.

picture

DeferredResult​ can allow container threads to quickly release occupied resources without blocking the request thread, so as to accept more requests to improve the throughput of the system, and then start asynchronous worker threads to process the real business logic, and call DeferredResult.setResult(200) when the processing is complete. Submit the response result.

Below we use long polling to implement message push.

Because an ID may be monitored by multiple long polling requests, I use the Multimap structure provided by the guava package to store long polling, and a key can correspond to multiple values. Once the key changes are monitored, all corresponding long polls will respond. The front end gets the status code of the non-request timeout, knows the data change, actively queries the interface for the number of unread messages, and updates the page data.

@Controller
@RequestMapping("/polling")
public class PollingController {

    // 存放监听某个Id的长轮询集合
    // 线程同步结构
    public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());

    /**
     * 公众号:程序员小富
     * 设置监听
     */
    @GetMapping(path = "watch/{id}")
    @ResponseBody
    public DeferredResult<String> watch(@PathVariable String id) {
        // 延迟对象设置超时时间
        DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
        // 异步请求完成时移除 key,防止内存溢出
        deferredResult.onCompletion(() -> {
            watchRequests.remove(id, deferredResult);
        });
        // 注册长轮询请求
        watchRequests.put(id, deferredResult);
        return deferredResult;
    }

    /**
     * 公众号:程序员小富
     * 变更数据
     */
    @GetMapping(path = "publish/{id}")
    @ResponseBody
    public String publish(@PathVariable String id) {
        // 数据变更 取出监听ID的所有长轮询请求,并一一响应处理
        if (watchRequests.containsKey(id)) {
            Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult("我更新了" + new Date());
            }
        }
        return "success";
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.

When the request exceeds the set timeout time, an AsyncRequestTimeoutException will be thrown. Here, you can directly use @ControllerAdvice to capture the global capture and return it uniformly. After the front end obtains the agreed status code, the long polling request is initiated again, and so on.

@ControllerAdvice
public class AsyncRequestTimeoutHandler {

    @ResponseStatus(HttpStatus.NOT_MODIFIED)
    @ResponseBody
    @ExceptionHandler(AsyncRequestTimeoutException.class)
    public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
        System.out.println("异步请求超时");
        return "304";
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

Let's test it. First, the page initiates a long polling request /polling/watch/10086​to monitor the message change, the request is suspended, the data is not changed until the timeout, and the long polling request is initiated again; then manually change the data /polling /publish/10086, the long polling is responded, and the front-end processing business logic is completed and the request is initiated again, and so on.

picture

Compared with short polling, long polling has improved performance a lot, but it still generates more requests, which is a little imperfect.

iframe stream

The iframe flow is to insert a hidden <iframe>​tag in the page, by requesting the API interface of the number of messages in src​, thereby creating a long connection between the server and the client, and the server continues to transmit data to the iframe.

"

The transmitted data is usually HTML​ or embedded javascript to achieve the effect of updating the page in real time.

picture

This method is simple to implement, the front end only needs an <iframe> tag to get it done

<iframe src="/iframe/message" style="display:none"></iframe>
  • 1.

The server directly assembles the html and js script data and writes it to the response

@Controller
@RequestMapping("/iframe")
public class IframeController {
    @GetMapping(path = "message")
    public void message(HttpServletResponse response) throws IOException, InterruptedException {
        while (true) {
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().print(" <script type=\"text/javascript\">\n" +
                    "parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
                    "parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
                    "</script>");
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

But I personally don't recommend it, because it will show on the browser that the request has not been loaded, and the icon will keep spinning, which is an obsessive-compulsive disorder killer.

picture

SSE (my way)

Many people may not know that the server pushes messages to the client. In fact, in addition to the familiar mechanism of WebSocket​, there is also a server-sent events (Server-sent events​), referred to as SSE.

SSE​ is based on the HTTP protocol. We know that the HTTP protocol in the general sense cannot allow the server to actively push messages to the client, but SSE is an exception, which changes a way of thinking.

picture

SSE opens a one-way channel between the server and the client. The server responds with a one-time data packet instead of a text/event-stream type of data stream information, which is streamed from the server to the server when there is data change. client.

The overall implementation idea is a bit similar to online video playback. The video stream will be continuously pushed to the browser. You can also understand that the client completes a download that takes a long time (the network is not smooth).

picture

SSE​ is similar to WebSocket in that both can establish communication between the server and the browser, enabling the server to push messages to the client, but there are still some differences:

  • SSE is based on the HTTP protocol, and they do not require a special protocol or server implementation to work; WebSocket requires a separate server to handle the protocol.
  • SSE one-way communication, only one-way communication from the server to the client; webSocket full-duplex communication, that is, both parties of the communication can send and receive information at the same time.
  • SSE is simple to implement and has low development cost, without the need to introduce other components; WebSocket data transmission requires secondary analysis, and the development threshold is higher.
  • SSE supports disconnection and reconnection by default; WebSocket needs to be implemented by itself.
  • SSE can only transmit text messages, and binary data needs to be encoded and transmitted; WebSocket supports the transmission of binary data by default.

How to choose between SSE and WebSocket?

Technology is not good or bad, only which is more suitable

SSE seems to have been unknown to everyone, in part because of the advent of WebSockets, which provide a richer protocol to perform two-way, full-duplex communication. For gaming, instant messaging, and scenarios that require near real-time updates in both directions, having a two-way channel is more attractive.

However, in some cases, it is not necessary to send data from the client. And you just need some updates for server operations. For example: in-site messages, number of unread messages, status updates, stock quotes, monitoring quantity and other scenarios, SEE​is more advantageous in terms of ease of implementation and cost. Additionally, SSE has several features that WebSockets lack by design, such as: automatic reconnection, event IDs, and the ability to send arbitrary events.

The front end only needs to make an HTTP request, bring a unique ID, open the event stream, and listen to the events pushed by the server.

<script>
    let source = null;
    let userId = 7777
    if (window.EventSource) {
        // 建立连接
        source = new EventSource('http://localhost:7777/sse/sub/'+userId);
        setMessageInnerHTML("连接用户=" + userId);
        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);
        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });
    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }
</script>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.

The implementation of the server is simpler, create a SseEmitter​object and put it into sseEmitterMap for management

private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

/**
 * 创建连接
 *
 * @date: 2022/7/12 14:51
 * @auther: 公众号:程序员小富
 */
public static SseEmitter connect(String userId) {
    try {
        // 设置超时时间,0表示不过期。默认30秒
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        count.getAndIncrement();
        return sseEmitter;
    } catch (Exception e) {
        log.info("创建新的sse连接异常,当前用户:{}", userId);
    }
    return null;
}

/**
 * 给指定用户发送消息
 *
 * @date: 2022/7/12 14:51
 * @auther: 公众号:程序员小富
 */
public static void sendMessage(String userId, String message) {

    if (sseEmitterMap.containsKey(userId)) {
        try {
            sseEmitterMap.get(userId).send(message);
        } catch (IOException e) {
            log.error("用户[{}]推送异常:{}", userId, e.getMessage());
            removeUser(userId);
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.

We simulate the server to push the message, and see that the client receives the message, which is consistent with our expected effect.

picture

Note: SSE does not support Internet Explorer, and does a good job of compatibility with other major browsers.

picture

MQTT

What is the MQTT protocol?

MQTT (Message Queue Telemetry Transport): a lightweight communication protocol based on the publish/subscribe (publish​/subscribe​) model, obtain messages by subscribing to the corresponding topic, is the Internet of Things (Internet of Thing) a standard transport protocol.

This protocol separates the publisher of the message (publisher​) from the subscriber (subscriber), so it can provide reliable message services for remotely connected devices in an unreliable network environment, and the usage is somewhat similar to traditional MQ.

picture

The TCP​ protocol is located in the transport layer, the MQTT​ protocol is located in the application layer, and the MQTT​ protocol is built on the TCP/IP​ protocol, which means that as long as the TCP/IP​ protocol stack is supported, the MQTT protocol can be used.

Why use the MQTT protocol?

Why is the MQTT protocol so favored in the Internet of Things (IOT)? Instead of other protocols, such as the HTTP protocol we are more familiar with?

  • First of all, the HTTP​ protocol is a synchronous protocol, and the client needs to wait for the server's response after requesting. In the Internet of Things (IOT) environment, devices are very affected by the environment, such as low bandwidth, high network latency, unstable network communication, etc. Obviously, asynchronous message protocols are more suitable for IOT applications.
  • HTTP is one-way, and a client must initiate a connection to get a message, whereas in Internet of Things (IOT) applications, devices or sensors are often clients, which means they cannot passively receive commands from the network.
  • Usually a command or message needs to be sent to all devices on the network. HTTP to achieve such a function is not only difficult, but also extremely expensive.

The specific MQTT protocol introduction and practice, I will not repeat them here, you can refer to my two previous articles, which are also very detailed.

Introduction to the MQTT protocol

I didn't expect springboot + rabbitmq to do smart home, it would be so simple

MQTT implements message push

Unread messages (small red dots), the practice of real-time message push between front-end and RabbitMQ, thief is simple~

Websocket

Websocket should be a method that everyone is familiar with to implement message push. We also compared it with websocket when we talked about SSE above.

WebSocket is a protocol for full-duplex communication over a TCP connection, establishing a communication channel between a client and a server. The browser and the server only need one handshake, and a persistent connection can be created directly between the two, and two-way data transmission can be performed.

picture

Pictures from the Internet

Springboot integrates websocket and first introduces websocket-related toolkits, which requires additional development costs compared to SSE.

<!-- 引入websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

The server uses the @ServerEndpoint​ annotation to mark the current class as a websocket server, and the client can connect to the WebSocket server through ws://localhost:7777/webSocket/10086.

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    // 用来存在线连接数
    private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
    /**
     * 公众号:程序员小富
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            this.session = session;
            webSockets.add(this);
            sessionPool.put(userId, session);
            log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }
    /**
     * 公众号:程序员小富
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("websocket消息: 收到客户端消息:" + message);
    }
    /**
     * 公众号:程序员小富
     * 此为单点消息
     */
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                log.info("websocket消: 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.

The front-end initializes to open the WebSocket connection, monitors the connection status, receives server data or sends data to the server.

<script>
    var ws = new WebSocket('ws://localhost:7777/webSocket/10086');
    // 获取连接状态
    console.log('ws连接状态:' + ws.readyState);
    //监听是否连接成功
    ws.onopen = function () {
        console.log('ws连接状态:' + ws.readyState);
        //连接成功则发送一个数据
        ws.send('test1');
    }
    // 接听服务器发回的信息并处理展示
    ws.onmessage = function (data) {
        console.log('接收到来自服务器的消息:');
        console.log(data);
        //完成通信后关闭WebSocket连接
        ws.close();
    }
    // 监听连接关闭事件
    ws.onclose = function () {
        // 监听整个过程中websocket的状态
        console.log('ws连接状态:' + ws.readyState);
    }
    // 监听并处理error事件
    ws.onerror = function (error) {
        console.log(error);
    }
    function sendMessage() {
        var content = $("#message").val();
        $.ajax({
            url: '/socket/publish?userId=10086&message=' + content,
            type: 'GET',
            data: { "id": "7777", "content": content },
            success: function (data) {
                console.log(data)
            }
        })
    }
</script>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.

The page is initialized to establish a websocket connection, and then two-way communication can be performed, and the effect is not bad.

picture

picture

custom push

Above, we gave me the principles and code implementations of 6 solutions, but in the actual business development process, we cannot blindly use them directly. We should choose the appropriate solution based on the characteristics of our own system business and actual scenarios.

The most direct way to push is to use the third push platform. After all, the demand that money can solve is not a problem. It can be used directly without complicated development, operation and maintenance. It saves time, effort and worry. Like goEasy and Jiguang push, it is very good. third-party service provider.

Generally, large companies have self-developed message push platforms. For example, the web site message we implemented this time is just a contact point on the platform. SMS, email, WeChat public account, and small programs can be accessed through any channel that can reach users. come in.

picture

The picture comes from the Internet

The inside of the message push system is quite complex, such as the maintenance and review of message content, delineating the push crowd, reaching filtering and intercepting (the frequency of push rules, time period, quantity, black and white lists, keywords, etc.), and modules with a lot of push failure compensation. , There are also many scenarios that technically involve large amounts of data and high concurrency. So the way we implement it today is just a small game in front of this huge system.

Github address

I have implemented all the cases mentioned in the article one by one, and put them on Github. If you find it useful, just Star it!

Portal: https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-realtime-data