Analysis of Netty's internal network implementation principle

Analysis of Netty's internal network implementation principle

We have finished introducing the core principles of the Netty network module. Fei Ge has been "advocating" the benefits of internal strength. As long as you have a solid internal strength, things that seem to be irrelevant in various languages ​​can actually be figured out at the bottom level. I myself have never developed a server program in Java, let alone Netty.

Netty is a network programming toolkit that is widely used in the Java ecosystem. It was born in 2004 and is still a complete mess. More than 30,000 projects are using it on github alone. So if you want to better master network programming, I think Netty can't be avoided. So today we will analyze the working principle of Netty's internal network module.

Friendly reminder, this article has nearly 20,000 to 30,000 words of code, which is relatively long. If time is short, you can skip the middle part. The first section and the final sixth section are suggested must-reads. Of course, dragging it directly to the end of the collection, likes, and forwards is also ok, haha!

In addition, I applied for sponsorship for everyone today. At the end of the article, I applied for 5 new books of Glacier "In-depth Understanding of High Concurrency Programming", which were given to everyone by lottery.

1. Netty usage

Let's first find an example of Netty, and this article as a whole is narrated around this example. We download the source code of Netty and find the echo demo in the examples. At the same time, in order to prevent the code update from affecting the description of this article, we switch to the 4.1 branch.

# git checkout https://github.com/netty/netty.git
# git checkout -b 4.1
# cd example/src/main/java/io/netty/example/echo
  • 1.
  • 2.
  • 3.

In this demo's EchoServer, the classic usage of using Netty to write Server is shown. (Brother Fei will do his best to simplify the original code without affecting the expression of the core logic in the article. For example, the try in the following code was lost by me)

public final class EchoServer {
 public static void main(String[] args) throws Exception {
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();

        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 if (sslCtx != null) {
                     p.addLast(sslCtx.newHandler(ch.alloc()));
                 }
                 p.addLast(serverHandler);
             }
         });

        // Start the server.
        ChannelFuture f = b.bind(PORT).sync();
        ......
 }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.

If you are a Java novice, or just like Fei Ge who did not use Netty to write services, I believe the above code is basically incomprehensible. The fundamental reason is that compared with C/C++, Java has a higher degree of encapsulation. The NIO encapsulation of the network in the JVM of the Java language itself has shielded many underlying concepts, and Netty has another layer of encapsulation, so some terms and concepts commonly used by Java developers are very different from other languages.

For example, Channel, NioEventLoopGroup, etc. in the above code are not seen in other languages. But you don't have to be afraid, because each of these concepts is just a different outfit for the underlying concepts such as sockets and processes. Let's take a closer look at each of these concepts.

1.1 NioEventLoopGroup

If you have no experience with Netty, you can simply understand NioEventLoopGroup as a thread pool. Each NioEventLoopGroup contains one or more NioEventLoops.

picture

Among them, NioEventLoop is a centralized encapsulation of concepts such as thread and epoll.

First, EventLoop itself is a thread. We can see why we say this by looking at the inheritance relationship of NioEventLoop. NioEventLoop inherits from SingleThreadEventLoop, which in turn inherits from SingleThreadEventExecutor. SingleThreadEventExecutor implements the thread native abstraction in Netty.

public abstract class SingleThreadEventExecutor extends ... {
 private volatile Thread thread;
 private final Queue<Runnable> taskQueue;
}
  • 1.
  • 2.
  • 3.
  • 4.

In the SingleThreadEventExecutor not only encapsulates the thread object Thread, but also configures a task queue taskQueue for other threads to place pending tasks to it.

1.2 selector

In addition, NioEventLoopEventLoop encapsulates epoll (under Linux operating system) in the name of selector.

picture

Inside the NioEventLoop object, there will be a selector member defined. This is actually where the packaged epoll comes from. Let's look at the specific packaging process. and selectedKeys, which is a list of pending events found on the selector.

public final class NioEventLoop extends SingleThreadEventLoop{
 // selector
 private Selector selector;
    private Selector unwrappedSelector;

    // selector 上发现的各种待处理事件
    private SelectedSelectionKeySet selectedKeys;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

When NioEventLoopGroup is constructed, it will call SelectorProvider#provider to generate the provider, and by default it will call sun.nio.ch.DefaultSelectorProvider.create to create it.

//file:java/nio/channels/spi/SelectorProvider.java
public abstract class SelectorProvider {

    public static SelectorProvider provider() {
     // 1. java.nio.channels.spi.SelectorProvider 属性指定实现类
        // 2. SPI 指定实现类
        ......

        // 3. 默认实现,Windows 和 Linux 下不同
        provider = sun.nio.ch.DefaultSelectorProvider.create();
        return provider;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

Under Linux, the provider created by default uses epoll.

//file:sun/nio/ch/DefaultSelectorProvider.java
public class DefaultSelectorProvider {
 public static SelectorProvider create() {
        String osname = AccessController
            .doPrivileged(new GetPropertyAction("os.name"));
        if (osname.equals("Linux"))
            return createProvider("sun.nio.ch.EPollSelectorProvider");
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

1.3 Channel

Channel is a concept in JavaNIO. Everyone understands it as a socket and the encapsulation of a series of operation methods on the socket.

picture

Java encapsulates the connect, bind, read, write and other methods in Channel as member methods.

public interface Channel extends ... {
 Channel read();
 Channel flush();
 ......

 interface Unsafe {
  void bind(SocketAddress localAddress, ...);
  void connect(SocketAddress remoteAddress, ...);
  void write(Object msg, ...);
  ......
 }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

In addition, in Java, it is customary to call the listen socket the parent channel, and the new connection created after the client handshake request arrives is called the child channel, which is easy to distinguish.

1.4 Pipelines

Inside each Channel object, in addition to encapsulating the socket, there is also a special data structure DefaultChannelPipeline pipeline. In this pipeline are handlers registered at various times.

The read and write operations on the Channel will go to this DefaultChannelPipeline. When register, active, read, readComplete and other operations are completed on the channel, the corresponding methods in the pipeline will be triggered.

picture

This ChannelPipeline is actually a doubly linked list and various operation methods on the linked list.

public interface ChannelPipeline {
 ChannelPipeline addFirst(String name, ChannelHandler handler);
 ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
 ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
 ChannelPipeline addLast(String name, ChannelHandler handler);

 ChannelPipeline fireChannelRead(Object msg);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

1.5 Interpretation of EchoServer

Now that we have a preliminary understanding of Java and Netty, let's take a look at the EchoServer source code mentioned at the beginning.

public final class EchoServer {
 public static void main(String[] args) throws Exception {
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();

        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 if (sslCtx != null) {
                     p.addLast(sslCtx.newHandler(ch.alloc()));
                 }
                 p.addLast(serverHandler);
             }
         });

        // Start the server.
        ChannelFuture f = b.bind(PORT).sync();
        ......
 }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.

At the beginning of the code, the line bossGroup = new NioEventLoopGroup(1) creates a thread pool with only one thread. workerGroup = new NioEventLoopGroup creates another worker thread pool. There is no specified number. Netty will flexibly decide according to the number of CPU cores of the current machine.

ServerBootstrap This is a scaffolding class to make it easier for us to write server programs.

The line b.group(bossGroup, workerGroup) is to pass in two thread pools. The first one acts as a boss and only processes accept to receive new client connection requests. The second parameter is the worker thread pool to handle request reception, processing and result sending on the connection.

Let's note that childHandler is passed a ChannelInitializer, which is a method that will be called back when a new client connection arrives. Inside this method, we add a handler serverHandler to the pipeline of this new chaneel, so that the handler can be executed for request processing when data is received.

The above methods are all definitions. In the b.bind method, the service is actually started, the parent channel (listen socket) is created, and the boss thread is created. When a new connection arrives, the boss thread creates a child channel, adds a processor to its pipeline, and starts a worker thread for processing.

2. Netty bootstrap parameter construction

In a nutshell, bootstrap.group() .channel() .childHandler() .childOption() are the various parameters for building Netty Server.

2.1 group settings

ServerBootstrap and its parent class AbstractBootstrap define two EventLoopGroup group members respectively. The parent class AbstractBootstrap's group is used to handle accpet events, and the childGroup under ServerBootstrap is used to handle all other read and write events.

The group() method is to set the EventLoopGroup parameter to its own members. If only one thread pool is passed in when calling group(), all events under this service will be processed by this thread pool in the future. For details, see Feige's simplified source code.

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
 //用来处理非 accept 以外的线程池
 private volatile EventLoopGroup childGroup;

 public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }
}

public abstract class AbstractBootstrap ... {
 //用来处理 accept 的线程
 volatile EventLoopGroup group;

 public B group(EventLoopGroup group) {
        this.group = group;
        ......
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.

2.2 channel settings

Let's look at the ServerBootstrap#channel method, which is used to define a factory method, which will be called when a channel needs to be created in the future.

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

 public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

Looking back at the demo at the beginning of this article, .channel(NioServerSocketChannel.class) refers to the type of NioServerSocketChannel created when a channel needs to be created in the future.

2.3 option settings

Look at the option method again, just set it to the options member

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    public <T> B option(ChannelOption<T> option, T value) {
        ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {
            if (value == null) {
                options.remove(option);
            } else {
                options.put(option, value);
            }
        }
        return self();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

2.4 handler method

In this demo, two handlers are set, one is handler and the other is childHandler. They are all set to their own members and they are done, look at the source code.

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends ...... {

    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

3. Netty bootstrap startup service

The bind method under ServerBootstrap is a very important method in the service startup process. Create the parent channel (listen socket), create the boss thread, bind the Acceptor processor to the boss thread, and call the system call bind to bind and monitor all done here.

Let's take a direct look at the entry source code related to bind.

//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap ... {
 ......
}

//file:io/netty/bootstrap/AbstractBootstrap.java
public abstract class AbstractBootstrap ... {

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(...);
    }

    private ChannelFuture doBind(final SocketAddress localAddress) {

     //创建父 channel、初始化并且注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        ......

        //如果 Register 已经完成,则直接 doBind0
        if (regFuture.isDone()) {
         ChannelPromise promise = channel.newPromise();
         doBind0(regFuture, channel, localAddress, promise);
         return promise;
        //否则就注册一个 listener(回调),等 register 完成的时候调用 
        } else {
         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
             regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                 promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            return promise;
        }
        
    }

    //创建 channel,对其初始化,并且 register(会创建 parent 线程)
    final ChannelFuture initAndRegister() {
     //3.1 创建父 channel(listen socket)
        channel = channelFactory.newChannel();

        //3.2 对父 channel(listen socket)进行初始化
        init(channel);

        //3.3 注册并启动 boss 线程
        ChannelFuture regFuture = config().group().register(channel);
        ......
    }

    //3.4 真正的bind
    private static void doBind0(...) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                ......
            }
        });
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.

During this process, the following important things were done:

  • Create parent channel (listen socket)
  • Initialize the parent channel (listen socket)
  • register the parent channel (listen socket) to the main group and start the main process
  • real bind

Let's take a look separately.

3.1 Create parent channel (listen socket)

Create a channel (socket) in the initAndRegister() method, which calls channelFactory.newChannel().

public abstract class AbstractBootstrap
    //创建 channel,对其初始化,并且 register(会创建 parent 线程)
    final ChannelFuture initAndRegister() {
     //3.1 创建 listen socket
        channel = channelFactory.newChannel();
        ......
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

Recall the channel method in Section 2.2, which returns a ReflectiveChannelFactory. That's right, the newChannel here is to call this factory method to create a NioServerSocketChannel object.

3.2 Initialize the parent channel (listen socket)

After initAndRegister creates the channel, you need to call init to initialize it.

public abstract class AbstractBootstrap
    final ChannelFuture initAndRegister() {
     //3.1 创建父 channel(listen socket)
        //3.2 对父 channel(listen socket)进行初始化
        init(channel);
        ......
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

The channel is initialized in init(), one is to assign values ​​to options and attrs, and the other is to build the pipeline of the parent channel.

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

 void init(Channel channel) {
  //设置 option 和 attr
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());

        //设置 pipeline
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelInitializer<Channel>() {
           ......
        });
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.

Set channel options in setChannelOptions. Recall that we can pass in SO_BACKLOG when using ServerBootstrap, which is one of the options. Here it will actually be set to the channel (socket).

ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 100)
  • 1.
  • 2.

In init, a little trickier to understand is p.addLast(new ChannelInitializer...). This piece of code just adds a handler to the parent channel. Its real implementation has to wait until after register, we will see it later.

3.3 register parent channel

After the parent channel is created and initialized, it needs to be registered on the boss thread before it can be used.

public abstract class AbstractBootstrap
    final ChannelFuture initAndRegister() {
     //3.1 创建父 channel(listen socket)
        //3.2 对父 channel(listen socket)进行初始化
        //3.3 注册并启动 boss 线程
        ChannelFuture regFuture = config().group().register(channel);
        ......
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

Among them, config().group() will eventually call AbstractBootstrap#group, and what we get in this method is the bossGroup we passed in.

public abstract class AbstractBootstrap
    volatile EventLoopGroup group;
    public final EventLoopGroup group() {
        return group;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

Where bossGroup is an instance of NioEventLoopGroup, so the code will enter the NioEventLoopGroup#register method.

public class NioEventLoopGroup extends MultithreadEventLoopGroup {}

public abstract class MultithreadEventLoopGroup extends ... {

 @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

Contains one or more EventLoops in a NioEventLoopGroup. The next method above is to select one of them, and then register the channel on it.

For this article, we are using NioEventLoopGroup, which naturally contains NioEventLoop, and we continue to find its register method.

public final class NioEventLoop extends SingleThreadEventLoop
 //在 eventloop 里注册一个 channle(socket)
 public void register(final SelectableChannel ch, ...) {
  ......
        register0(ch, interestOps, task);
 }

 //最终调用 channel 的 register
 private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
        ch.register(unwrappedSelector, interestOps, task);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

It can be seen that the register of NioEventLoop is finally called to the register of the channel. In our article, the channel we created is NioServerSocketChannel, and we will follow this clue to check.

//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ......

        //关联自己到 eventLoop
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
         try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            }
         ......
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.

In the register in the parent class AbstractChannel of the channel, first associate itself with the incoming eventLoop. Then call inEventLoop to determine whether the currently running thread of the thread is the support thread of the EventExecutor, and if so, return register0 directly.

Generally speaking, the main thread is running when the service is started. At this time it is likely that the boss thread has not started. So if it is found that it is not currently the boss thread, call eventLoop.execute to start the boss thread.

The parent class of NioEventLoop is SingleThreadEventExecutor, find the execute method.

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    public void execute(Runnable task) {
        execute0(task);
    }

    private void execute0(@Schedule Runnable task) {
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.

Let's first look at addTask(task), which adds the task to the task queue. Wait for the thread to get up before running it.

public abstract class SingleThreadEventExecutor extends ... {

 private final Queue<Runnable> taskQueue;
 protected void addTask(Runnable task) {
        (task);
    }
 final boolean offerTask(Runnable task) {
        return taskQueue.offer(task);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

inEventLoop() is to judge whether the current thread is a thread bound by itself, and it is still running in the main thread at this time, so if inEventLoop is false, it will enter startThread and start creating a thread for EventLoop.

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private void startThread() {
        doStartThread();
        ......
    }

    private void doStartThread() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                SingleThreadEventExecutor.this.run();
                ......
            }
        }
    }  
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

Call the Java thread management tool Executor in doStartThread to start the boss thread.

3.4 boss thread start

When the thread gets up, it enters its own thread loop, traverses its own task queue, and then starts to process its own tasks.

public final class NioEventLoop extends SingleThreadEventLoop {

    protected void run() {
        for (;;) {
            if (!hasTasks()) {
                strategy = select(curDeadlineNanos);
            }

            //如果有任务的话就开始处理
            runAllTasks(0);

            //任务处理完毕就调用 epoll_wait 等待事件发生
            processSelectedKeys();
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

Earlier we saw in Section 3.3 that eventLoop.execute adds a Runnable task to the task queue. When the EventLoop thread starts, it traverses its own task queue and starts processing. At this time, it will enter the AbstractChannel#register0 method to start running.

//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends ... {

    public final void register(...) {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
        ......
    }
    private void register0(ChannelPromise promise) {
        doRegister();
        ......
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

The function doRegister is under the AbstractNioChannel class.

//file:io/netty/channel/nio/AbstractNioChannel.java
public abstract class AbstractNioChannel extends AbstractChannel {

    private final SelectableChannel ch;

    protected SelectableChannel javaChannel() {
        return ch;
    }
    public NioEventLoop eventLoop() {
        return (NioEventLoop) super.eventLoop();
    }
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

The most critical sentence above is selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);. This sentence is equivalent to calling epoll_ctl in the C language to add the listen socket to the epoll object.

Among them, javaChannel obtains the parent channel, which is equivalent to listen socket. unwrappedSelector Gets the selector, which is equivalent to the epoll object. register is equivalent to performing an add operation using epoll_ctl.

When the channel is registered, the ChannelInitializer callback registered in the previous init will be executed. Look back at its callback definition.

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

 void init(Channel channel) 
  ......

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
             ......
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.

In ChannelInitializer#initChannel, another task is added to the pipeline of the boss thread. The task is to have it register a ServerBootstrapAcceptor handler on its own pipeline. When a new connection arrives in the future, the ServerBootstrapAcceptor will be executed.

3.5 Real bind

Look at the doBind0 method and call channel.bind to complete the binding.

private static void doBind0(...) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                ......
            }
        });
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

Fourth, the new connection arrives

Let's go back to the main loop of the boss thread.

public final class NioEventLoop extends SingleThreadEventLoop {
    protected void run() {
        for (;;) {
            strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

            //任务队列都处理完就开始 select
            if (!hasTasks()) {
                strategy = select(curDeadlineNanos);
            }
            //处理各种事件
            if (strategy > 0) {
                processSelectedKeys();
            }
        }  
    }
    
    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // Timeout will only be 0 if deadline is within 5 microsecs
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.

If the tasks in the thread task queue are processed cleanly, the boss thread will call select to discover various events on its selector. Equivalent to epoll_wait in C language.

When an event is found, such as OP_WRITE, OP_ACCEPT, OP_READ, etc., it will enter the corresponding processing

public final class NioEventLoop extends SingleThreadEventLoop {
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     ......
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
         ch.unsafe().forceFlush();
     }
     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         unsafe.read();
     }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

For Unsafe.read() on the server side, the io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() method will be executed, and it will call the JDK underlying ServerSocketChannel.accept() to receive the client's connection and encapsulate it as Netty's NioSocketChannel, and then propagate the ChannelRead event through the Pipeline, so that the ServerBootstrapAcceptor can handle the new client connection in the ChannelRead callback.

We look directly at ServerBootstrapAcceptor#ChannelRead.

//file:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
 
 ......

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
         // 获取child channel
            final Channel child = (Channel) msg;

            // 设置 childHandler 到 child channel
            child.pipeline().addLast(childHandler);

            // 设置 childOptions、 childAttrs
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            // 将 child channel 注册到 childGroup
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • twenty one.
  • twenty two.
  • twenty three.
  • twenty four.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.

In channelRead, the newly created child channel is obtained first, and childHandler is added to its pipeline. Looking back at Section 1.5, childHandler is our custom.

Then call childGroup.register(child) to register the child channel with the workerGroup. This register process is the same as that in Section 3.3 and Section 3.5. The difference is that the parent channel is registered to the bossGroup, and the child channel is registered to the workerGroup.

After the register is completed, the sub-channel is hung on one of the threads of the workerGroup. If the corresponding thread is not created, it will be created and entered into its own thread loop.

When the child channel is registered, ChannelInitializer#initChannel in childHandler will be executed

public final class EchoServer {
 public static void main(String[] args) throws Exception {
  ...
        ServerBootstrap b = new ServerBootstrap();
        b.childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 if (sslCtx != null) {
                     p.addLast(sslCtx.newHandler(ch.alloc()));
                 }
                 p.addLast(serverHandler);
             }
        });
        ......
 }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

In the initChannel, the processing class serverHandler of the child channel is added. The definition of this processing class in the Netty demo is very simple, just print it out.

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(......) {
        ctx.write(msg);
    }
    ......
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

5. User request arrives

When the worker thread is up, it will enter the thread loop (the run function of the boss thread and the worker thread is one). In the loop, it will traverse its own task queue. If there is no task to process, select it to observe whether there is an event on the channel that it is responsible for.

public final class NioEventLoop extends SingleThreadEventLoop {

    protected void run() {
        for (;;) {
            if (!hasTasks()) {
                strategy = select(curDeadlineNanos);
            }

            //如果有任务的话就开始处理
            runAllTasks(0);

            //任务处理完毕就调用 epoll_wait 等待事件发生
            processSelectedKeys();
        }
    }
    private int select(long deadlineNanos) throws IOException {
        selector.selectNow();
        ......    
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

The worker thread will call select to find readable and writable events on all child channels it manages. After a readable event is found, processSelectedKeys will be called, and finally the pipeline will be triggered to make the EchoServerHandler method start executing.

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(......) {
        ctx.write(msg);
    }
    ......
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

6. Summary

In fact, Netty is more flexible about network encapsulation. It supports both single-threaded Reactor, multi-threaded Reactor, and master-slave multi-threaded Reactor. The corresponding usage of the three models is as follows:

public static void main(String[] args) throws Exception {
 //单线程 Reactor
 EventLoopGroup eventGroup = new NioEventLoopGroup(1);
 ServerBootstrap serverBootstrap = new ServerBootstrap(); 
 serverBootstrap.group(eventGroup);    
 ......

 //多线程 Reactor
 EventLoopGroup eventGroup = new NioEventLoopGroup();
 ServerBootstrap serverBootstrap = new ServerBootstrap(); 
 serverBootstrap.group(eventGroup);    
 ......

 //主从多线程 Reactor
 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 ServerBootstrap serverBootstrap = new ServerBootstrap(); 
 serverBootstrap.group(bossGroup, workerGroup);    
 ......
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

In order to express more comprehensively, Feige chose the most classic master-slave multi-threaded Reactor mode in this article. The content described in this paper can be represented by the following figure.

picture

The boss thread in Netty is responsible for monitoring and processing events on the parent channel (listen socket). When a new connection arrives, a worker thread is selected to call the child channel (connection socket) to the worker thread for processing.

The Worker thread is waiting for the monitoring and processing of events on all sub-channels (connected sockets) it manages. When an event is found, the handler set by the user is called back for processing. In this example, the user handler is EchoServerHandler#channelRead.

So far, we have introduced the core principle of the Netty network module. Fei Ge has been "advocating" the benefits of internal strength. As long as you have a solid internal strength, things that seem to be irrelevant in various languages ​​can actually be figured out at the bottom level. I myself have never developed a server program in Java, let alone Netty. But when you have a deep understanding of epoll, you can easily understand it when you look at Netty again, and you can quickly understand its core. This is the benefit of exercising internal strength!