Let's talk about the design and implementation of Netty client disconnection and reconnection

In fact, Netty exposes a lot of APIs that provide user-implemented self-implementation based on the network connection declaration cycle, and this article will implement connection reliability based on one of the extension points, I hope it will be helpful to you.

Explain in detail the design and implementation of Netty client disconnection and reconnection

The channelInactive method in the Netty lifecycle

Readers who have read the author's previous articles are generally familiar with the callback method channelInactive, and we can see from its comments that the channel of the registered ChannelHandlerContext is now inactive or unavailable, and all the processors on the pipeline will be called to execute their internal implementation of channelInactive to handle the remaining business:

 /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

In fact, we can also let readers understand the execution of channelInactive through the source code, we take the client connection as an example, once the client disconnects, the client's selector will take the connection close event in turn, and will cancel the channel of the corresponding client and call the channelInactive method:

From the perspective of the source code, after NioEventLoop polls for a closing event, it will directly execute the closeOnRead method of the event, and if it internally determines that the connection is not open, it will directly call close to close the connection.

protected class NioByteUnsafe extends AbstractNioUnsafe {

        private void closeOnRead(ChannelPipeline pipeline) {
            if (isOpen()) {
                //......
                } else {
                 //调用close执行关闭连接
                    close(voidPromise());
                }
            }
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

The close logic will eventually locate the socketchannel of the client to execute the close method of AbstractChannel, and it will register a doDeregister event to the eventLoop, which will cancel the read and write event registered by the client's socket, and then call fireChannelInactive to the channelInactive callback after completion. Inform the current client that the remote connection to the socket netty is no longer active and has been disconnected:

In this regard, we give the source code snippet shown in the figure above, and change the code to the close method of AbstractChannel, the core logic of which is to call fireChannelInactiveAndDeregister to remove the read and write events of the client socket and trigger the callback notification of channelInactive:

private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
           //......

          
            if (closeExecutor != null) {
                 //......
            } else {
                 //......
                } else {
                //调用fireChannelInactiveAndDeregister移除断开连接的客户端socket并触发channelInactive回调
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

The internal core logic of fireChannelInactiveAndDeregister is the deregister method, you can see that the core logic of the method is to submit an asynchronous task to eventLoop, that is, the method we mentioned in the above figure to remove the client read and write event, the method name is doDeregister, and the fireChannelInactive method will be called after the method is called. Tell the server that the client channel connection has been dropped:

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
           //......
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                    //移除客户端读写事件
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                    //触发客户端channel的channelInactive回调
                        if (fireChannelInactive) {
                            pipeline.fireChannelInactive();
                        }
                      //......
                    }
                }
            });
        }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

In this regard, we give the logic of doDeregister, and we can see that it gets the eventLoop event poller internally, and removes the current client socket read and write events by calling cancel:

   @Override
    protected void doDeregister() throws Exception {
    //通过selectionKey获取断开连接的客户端读写事件的key,通过cancel移除这些事件
        eventLoop().cancel(selectionKey());
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

Netty disconnection and reconnection ideas and implementation

Therefore, we know that in order to achieve disconnection and reconnection, the client can override the channelInactive method to ensure that a connection delay event is submitted again when the connection is disconnected, and the disconnected connection is restored again, so as to ensure the reliability of the client's connection:

Finally, we give the ReconnectHandler for disconnection and reconnection, the internal logic is very simple, after a delay of 5 seconds, submit an asynchronous connection task to eventLoop until it succeeds, and then we add this processor to the client's pipeline:

public class ReconnectHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

  //提交断线重连的延迟任务
        scheduledDoReConnect(ctx);
        
        ctx.fireChannelInactive();
    }


    private ScheduledFuture<?> scheduledDoReConnect(ChannelHandlerContext ctx) {
        //拿到当前channel的eventLoop提交一个连接远程服务端的延迟任务
        ScheduledFuture<?> scheduledFuture = ctx.channel().eventLoop().schedule(() -> {
            ChannelFuture channelFuture = ctx.channel().connect(new InetSocketAddress("127.0.0.1", 8888));
            channelFuture.addListener(f -> {
                if (!f.isSuccess()) {
                    //如果失败则递归调用scheduledDoReConnect再次尝试
                    scheduledDoReConnect(ctx);
                } else {
                    System.out.println("reconnect success.");
                }
            });

        }, 5, TimeUnit.SECONDS);


        return scheduledFuture;

    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

brief summary

Since then, we have given the design and implementation ideas of client disconnection and reconnection based on the source code analysis of Netty's life cycle, hoping to help you.