聊聊 Netty 用戶端斷線重連的設計與實現

其實Netty基於網路連接聲明週期暴露了很多提供使用者自實現的API,而本文將基於其中的一個拓展點實現連接可靠性,希望對你有説明。

詳解Netty用戶端斷線重連的設計和實現

Netty生命週期中的channelInactive方法

讀過筆者往期文章的讀者大體是都知道channelInactive這個回調方法,我們從其註釋即可知曉:註冊的ChannelHandlerContext 的 Channel現在已經是不活躍即已經不可用的連接,就會調用pipeline上所有的處理器執行其內部實現的channelInactive處理剩餘業務:

 /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

實際上channelInactive的執行我們也可以通過源碼的方式讓讀者了解,我們以用戶端連接為例,一旦用戶端斷開連接,用戶端的selector就會輪循到連接關閉事件,便會將對應用戶端的channel取消並調用channelInactive方法:

從源碼角度來NioEventLoop輪詢到關閉事件後會直接執行該事件closeOnRead方法,其內部判斷連接非open狀態則會直接調用close進行連接關閉操作:

protected class NioByteUnsafe extends AbstractNioUnsafe {

        private void closeOnRead(ChannelPipeline pipeline) {
            if (isOpen()) {
                //......
                } else {
                 //调用close执行关闭连接
                    close(voidPromise());
                }
            }
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

close邏輯內部最終會定位到用戶端的socketchannel執行到AbstractChannel的close方法,其內部會向eventLoop註冊一個doDeregister的事件,該事件會將用戶端socket註冊的讀寫事件取消,完成後就會調用fireChannelInactive走到channelInactive回調, 通知當前用戶端netty這個socket的遠端連接不再活躍,已經斷開了:

對此我們給出上圖所示的源碼片段,改代碼位於AbstractChannel的close方法,其內部核心邏輯就是調用fireChannelInactiveAndDeregister移除用戶端socket的讀寫事件並觸發channelInactive的回調通知:

private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
           //......

          
            if (closeExecutor != null) {
                 //......
            } else {
                 //......
                } else {
                //调用fireChannelInactiveAndDeregister移除断开连接的客户端socket并触发channelInactive回调
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

fireChannelInactiveAndDeregister內部核心邏輯就是deregister方法,可以看到該方法核心邏輯就是提交給eventLoop一個異步任務,也就是我們上圖所說的移除用戶端讀寫事件的方法,方法名是doDeregister,完成該方法調用後就會調用fireChannelInactive方法, 告知服務端這個用戶端channel連接已斷開:

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
           //......
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                    //移除客户端读写事件
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                    //触发客户端channel的channelInactive回调
                        if (fireChannelInactive) {
                            pipeline.fireChannelInactive();
                        }
                      //......
                    }
                }
            });
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

對此我們給出doDeregister的邏輯,可以看到其內部拿到eventLoop事件輪詢器,通過調用cancel移除當前用戶端socket讀寫事件:

   @Override
    protected void doDeregister() throws Exception {
    //通过selectionKey获取断开连接的客户端读写事件的key,通过cancel移除这些事件
        eventLoop().cancel(selectionKey());
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

Netty斷線重連思路與實現

由此我們知曉要想實現斷線重連,用戶端可以通過重寫channelInactive方法,確保在感知到連接斷開時再次提交一個連接的延遲事件,知道斷線的連接再次恢復,由此保證用戶端連接可靠性:

最終我們給出斷線重連的ReconnectHandler,其內部邏輯很簡單,延遲5秒後向eventLoop提交一個斷線重連的異步連接任務直到成功,完成後我們將這個處理器添加到用戶端的pipeline即可:

public class ReconnectHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

  //提交断线重连的延迟任务
        scheduledDoReConnect(ctx);
        
        ctx.fireChannelInactive();
    }


    private ScheduledFuture<?> scheduledDoReConnect(ChannelHandlerContext ctx) {
        //拿到当前channel的eventLoop提交一个连接远程服务端的延迟任务
        ScheduledFuture<?> scheduledFuture = ctx.channel().eventLoop().schedule(() -> {
            ChannelFuture channelFuture = ctx.channel().connect(new InetSocketAddress("127.0.0.1", 8888));
            channelFuture.addListener(f -> {
                if (!f.isSuccess()) {
                    //如果失败则递归调用scheduledDoReConnect再次尝试
                    scheduledDoReConnect(ctx);
                } else {
                    System.out.println("reconnect success.");
                }
            });

        }, 5, TimeUnit.SECONDS);


        return scheduledFuture;

    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

小結

自此我們基於Netty生命週期的源碼剖析給出客戶端斷線重連的設計和落地思路,希望對你有説明。