Netty核心原理

Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 的官方文档地址如下:https://netty.io/

Netty及其核心组件

Netty的设计是基于Reactor模式的,它利用NIO(非阻塞I/O)技术来处理大量并发连接。Netty支持多种传输层协议,包括TCP、UDP、文件传输等,并提供了对HTTP、WebSocket、Mqtt等协议的支持。

Netty架构设计:

  • Channel:代表一个Socket连接,是Netty中最重要的概念之一。
  • EventLoop:负责处理特定Channel上的I/O事件。
  • ChannelHandlerContext:为每个Channel提供上下文信息,用于访问Channel和触发事件。
  • ChannelPipeline:一系列ChannelHandler的容器,用于处理入站和出站事件。
  • ChannelHandler:用于处理Channel上发生的事件,分为InboundHandler和OutboundHandler。
  • ByteBuf:Netty提供的缓冲区,用于替代标准Java的ByteBuffer,更高效地处理二进制数据。

Netty 建立在 Java NIO 的基础上,利用了 NIO 提供的非阻塞 I/O 功能,使用了 NIO 的 Channel 和 Buffer 概念,并在此之上做了进一步的封装和优化。

环境搭建:

xml
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.112.Final</version>
</dependency>

Netty 4.1 是 Netty 4 系列的一个重要版本,也是目前广泛使用的版本之一。后续将以版本为主。

EventLoopGroup

EventLoopGroup是一个包含多个EventLoop实例的容器,用于管理这些EventLoop。它可以根据需要创建一定数量的EventLoop实例,并将它们分发到不同的线程中。

  • Netty提供了几种不同类型的EventLoopGroup,例如NioEventLoopGroup用于NIO操作,EpollEventLoopGroup用于Linux下的epoll操作。

  • NioEventLoopGroup 相当于一组线程,每个线程都包含了 Selector 用于监控和处理 I/O 事件。

创建EventLoop组:

java
EventLoopGroup bossGroup = new NioEventLoopGroup(1);   // 一个线程用于接受连接
EventLoopGroup workerGroup = new NioEventLoopGroup();  // 多个线程用于处理I/O事件

当创建一个EventLoopGroup时,Netty会根据配置自动创建一定数量的EventLoop实例。这些实例通常是通过一个线程池来管理的,每个EventLoop绑定到一个独立的线程。

Bootstrap

在 Netty 中,BootstrapServerBootstrap 是两个重要的类,它们分别用于客户端和服务端的初始化设置。

ServerBootstrap 是 Netty 中用于==启动服务器的核心类之一==。它提供了一种简单的方法来配置和启动 Netty 服务器。ServerBootstrap 的设计目标是简化服务器的启动过程。ServerBootstrap 的工作流程如下:

  1. 创建 EventLoopGroup: 创建 bossGroupworkerGroupbossGroup 用于处理连接请求,workerGroup 用于处理 I/O 事件。

  2. 配置 ServerBootstrap: 设置 EventLoopGroupChannel 类型、初始化器等。

  3. 绑定端口:调用 bind 方法绑定服务器到指定的端口,并等待直到完成。

  4. 处理连接请求:当客户端尝试连接时,bossGroupEventLoop 会处理连接请求,并为每个新连接创建一个新的 Channel

  5. 初始化 Channel:通过 childHandler 初始化每个新连接的 ChannelChannelPipeline

  6. 处理 I/O 事件workerGroupEventLoop 会处理每个 Channel 上的 I/O 事件。

  7. 关闭服务器:调用 close 方法关闭服务器,并通过 shutdownGracefully 方法关闭 EventLoopGroup

通过使用 ServerBootstrap,你可以轻松地设置服务器的各种属性,并且通过配置 ChannelInitializer 来初始化每个新连接的 Channel

下面是一个简单的服务端和客户端示例,展示了如何使用 BootstrapServerBootstrap

java
public class MyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());

            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
java
public class MyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());

            ChannelFuture f = b.connect("localhost", 8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

Channel

通道(Channel)代表了一个网络连接的抽象,是所有 I/O 操作的基础。Netty 提供了多种 Channel 类型,每种类型都针对不同的传输协议进行了优化。以下是常见的 Channel 类型:

  1. NioServerSocketChannel:使用 NIO 的 ServerSocketChannel。用于创建服务器端 Channel,用于接受客户端连接。

  2. NioSocketChannel:使用 NIO 的 SocketChannel。用于客户端发起连接的 Channel,用于与服务器通信。

  3. OioServerSocketChannelOioSocketChannel:基于阻塞 I/O 的 Channel 类型,适用于低并发场景。使用标准 Java 的 ServerSocketSocket

  4. EpollServerSocketChannelEpollSocketChannel:针对 Linux 平台的 Channel 类型,使用 epoll 代替 select/poll。提供了更高的性能。

  5. KQueueServerSocketChannelKQueueSocketChannel:针对 BSD 和 macOS 平台的 Channel 类型,使用 kqueue 机制。

  6. DatagramChannel:使用 NIO 的 DatagramChannel。用于 UDP 协议的 Channel 类型。

在Netty中,Channel的生命周期是由Netty框架自动管理的,通常不需要直接编写代码来控制这个生命周期。Netty通过事件通知和回调机制让你能够监听和响应Channel的状态变化。

Netty 中的 Channel 与 Java NIO 中的 Channel 之间存在一定的联系,但也有很多重要的区别。

  • Java NIO 提供了基本的 I/O 操作,但需要程序员自己管理线程、事件循环和错误处理。
  • Netty 建立在 Java NIO 的基础上,提供了一套更高级的 API,自动处理了许多底层细节,使得编写高性能的网络应用变得更加简单和高效。

尽管 Channel 的生命周期是由Netty框架自动管理的,但可以通过向 ChannelPipeline 添加处理器来监听这些事件。

::: details 监听 Channel 的生命周期事件

java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelStateEvent;

public class LifecycleHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel registered.");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel active.");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel inactive.");
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel unregistered.");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel closed.");
        super.channelClosed(ctx);
    }

    @Override
    public void channelDestroyed(ChannelHandlerContext ctx) {
        System.out.println("Channel destroyed.");
        super.channelDestroyed(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) evt;
            switch (event.getState()) {
                case OPEN:
                    System.out.println("Channel open state changed to: " + event.getValue());
                    break;
                case BOUND:
                    System.out.println("Channel bound state changed to: " + event.getValue());
                    break;
                case CONNECTED:
                    System.out.println("Channel connected state changed to: " + event.getValue());
                    break;
                case INTEREST_OPS:
                    System.out.println("Channel interest ops state changed to: " + event.getValue());
                    break;
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

上例中创建了一个 LifecycleHandler 类,它继承自 ChannelInboundHandlerAdapter。我们覆盖了多个方法来监听 Channel 的生命周期事件,如 channelRegisteredchannelActivechannelInactivechannelUnregisteredchannelClosedchannelDestroyed

添加处理器

要使用这个处理器,你需要将其添加到 ChannelPipeline 中。这通常是在 ChannelInitializer 中完成的。下面是如何在 NettyServerInitializer 类中添加 LifecycleHandler 的示例:

java
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new LifecycleHandler()); // 添加 LifecycleHandler 到 ChannelPipeline
    }
}

:::

每个处理器都可以实现 ChannelInboundHandlerAdapterChannelInboundHandler 接口,并覆盖相应的生命周期方法来响应这些事件。

ChannelInitializer

ChannelInitializer 是一个抽象类,它的主要目的是在 Channel 被注册到 EventLoop 之后,但在 Channel 变为活跃状态之前,初始化 ChannelChannelPipeline。这样可以确保 Channel 在开始接收或发送数据之前已经配置好了所有必要的处理器。

==ChannelInitializer 是一个抽象类,你需要继承它并实现 initChannel 方法==。initChannel 方法会在每个新连接的 Channel 上被调用一次。

ChannelInitializer 的工作流程:

  1. 创建 ChannelInitializer:创建一个继承自 ChannelInitializer 的类,并实现 initChannel 方法。

  2. 注册 ChannelInitializer: 将 ChannelInitializer 注册到 ServerBootstrapBootstrapchildHandler 方法中。

  3. 初始化 ChannelPipeline: 当新的 Channel 被创建并注册到 EventLoop 上时,ChannelInitializerinitChannel 方法会被调用。在 initChannel 方法中,你可以添加处理器到 ChannelPipeline

  4. 处理数据:一旦 Channel 被激活,ChannelPipeline 中的处理器就会开始处理数据。

下面是一个使用 ChannelInitializer 的示例,展示了如何初始化 ChannelChannelPipeline

::: details 使用 ChannelInitializer 的示例

java

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new NettyServerHandler());
    }
}

class NettyServerHandler extends io.netty.channel.SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Received message: " + msg);
        ctx.writeAndFlush("Hello, " + msg);
    }

    @Override
    public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在这个例子中,我们创建了一个 NettyServerInitializer 类,它继承自 ChannelInitializer<SocketChannel>。我们在 initChannel 方法中向 ChannelPipeline 添加了三个处理器:StringDecoderStringEncoderNettyServerHandler。 :::

ChannelInitializer 是一个抽象类,它定义了一个 initChannel 方法。当你继承 ChannelInitializer 并实现 initChannel 方法时,你可以做以下事情:

  1. 添加处理器到 ChannelPipeline: 使用 pipeline.addLast(String name, ChannelHandler handler) 方法来添加处理器。

  2. 配置处理器:在添加处理器之前或之后,你可以根据需要配置处理器。

  3. 添加多个处理器: 你可以添加多个处理器到 ChannelPipeline,以实现不同的功能。

  4. 访问 Channel 属性:你可以在 initChannel 方法中访问 Channel 的属性,如 ChannelConfig

ChannelHandler

ChannelHandler 是一个接口,它定义了处理网络事件的方法。Netty中的 ChannelHandler 主要有两种类型:InboundHandlerOutboundHandler

  • InboundHandler 处理入站事件,即处理从网络到达的数据。入站事件通常包括数据读取、连接建立、连接关闭等。入站处理器负责接收数据,并可以对其进行解码、转换或进一步处理。

  • OutboundHandler 处理出站事件,即处理发送到网络的数据。出站事件通常包括数据写入、连接请求等。出站处理器负责编码数据,并将其发送到网络。

除了 InboundHandlerOutboundHandler 之外, ChannelDuplexHandler 可以同时实现 ChannelInboundHandlerChannelOutboundHandler 的处理器,可以同时处理入站和出站事件。

==ChannelPipeline 是一系列 ChannelHandler 的有序集合==。当一个 Channel 接收或发送数据时,数据会经过 ChannelPipeline 中的一系列处理器。ChannelPipeline 提供了一种插件式的架构,使得开发者可以轻松地添加、移除或修改处理器,从而实现不同的功能。

ByteBuf

ByteBuf 是 Netty 中用于存储和操作二进制数据的主要抽象类。与 Java NIO 的 ByteBuffer 相比,ByteBuf 提供了更多的功能和更好的性能。ByteBuf 是线程安全的,可以用于跨线程的数据传输。下面是几种常用的创建 ByteBuf 的方法:

  • 使用内存池PooledByteBufAllocator.DEFAULT 明确使用了内存池机制来优化性能。
  • 不使用内存池:直接使用 Unpooled
  • 自动决定:使用 ByteBufAllocator.DEFAULT,让Netty自动选择。
java
// 创建一个可变 ByteBuf,可指定最大容量(注意:可变指的是内容可以修改,可调用写入相关的方法)
ByteBuf unpoolBuf = Unpooled.buffer(1024);
// 将多个 ByteBuf 实例组合成一个只读的 CompositeByteBuf
ByteBuf compositeBuf = Unpooled.wrappedBuffer(buf1, buf2);

// 使用 PooledByteBufAllocator 创建 可变的 ByteBuf
ByteBuf pooledDirectBuf = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
ByteBuf pooledHeapBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);


// 使用 ByteBufAllocator 创建 ByteBuf
ByteBuf allocatorBuf = ByteBufAllocator.DEFAULT.buffer(1024);
  • Unpooled:适用于不需要频繁分配和释放缓冲区的场景,例如处理少量数据或进行测试。
  • PooledByteBufAllocator:适用于需要频繁分配和释放缓冲区的高性能场景,例如处理大量数据或实时系统。
  • ByteBufAllocator:提供了一个统一的接口来创建 ByteBuf,适用于需要灵活切换内存管理策略的场景。

ByteBuf 提供了许多用于操作二进制数据的方法。下面是一些常见的操作方法:

  1. 读写索引

    • readerIndex(): 获取当前读索引位置。
    • writerIndex(): 获取当前写索引位置。
    • setReaderIndex(int index): 设置读索引位置。
    • setWriterIndex(int index): 设置写索引位置。
  2. 读写数据

    • getByte(int index): 读取指定位置的字节。
    • setByte(int index, int value): 在指定位置设置字节。
    • readByte(): 读取当前读索引位置的字节并移动读索引。
    • writeByte(int value): 在当前写索引位置写入字节并移动写索引。
    • readInt(): 读取当前读索引位置的整数并移动读索引。
    • writeInt(int value): 在当前写索引位置写入整数并移动写索引。
    • readBytes(ByteBuf dst, int length): 读取指定长度的字节到目标 ByteBuf
    • writeBytes(ByteBuf src, int length): 从源 ByteBuf 写入指定长度的字节。
  3. 容量和限制

    • capacity(): 获取 ByteBuf 的总容量。
    • maxCapacity(): 获取 ByteBuf 的最大容量。
    • isReadable(): 判断是否还有可读数据。
    • isWritable(): 判断是否还有可写空间。
    • markReaderIndex(): 标记当前读索引位置。
    • resetReaderIndex(): 重置读索引到标记的位置。
    • markWriterIndex(): 标记当前写索引位置。
    • resetWriterIndex(): 重置写索引到标记的位置。
  4. 释放资源

    • release(): 释放 ByteBuf 占用的资源。

下面是一个使用 ByteBuf 的示例,展示如何在 Netty 中读写二进制数据:

java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ByteBufExampleHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buffer = Unpooled.buffer(); // 创建一个 ByteBuf
        buffer.writeInt(12345); // 写入整数
        buffer.writeChar('A'); // 写入字符
        buffer.writeByte(65); // 写入字节

        // 发送到网络
        ctx.writeAndFlush(buffer);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buffer = (ByteBuf) msg;

        // 读取整数
        int intValue = buffer.readInt();
        System.out.println("Read integer: " + intValue);

        // 读取字符
        char charValue = buffer.readChar();
        System.out.println("Read character: " + charValue);

        // 读取字节
        byte byteValue = buffer.readByte();
        System.out.println("Read byte: " + byteValue);

        // 释放 ByteBuf
        buffer.release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

尽管 StringDecoderStringEncoder 可以简化字符串数据的处理,但 ByteBuf 在 Netty 中仍然非常重要,原因如下:

以下是一些可能需要直接使用 ByteBuf 的场景:

  1. 自定义协议处理:如果您正在开发一个使用自定义协议的应用程序,您可能需要直接操作二进制数据来处理这些协议。

  2. 多协议栈:当您的应用程序需要支持多种不同的协议时,直接使用 ByteBuf 可以更灵活地处理这些协议。

  3. 多媒体数据处理:处理图片、音频或视频等多媒体数据时,通常需要直接操作二进制数据。

  4. 性能敏感的应用:对于那些对性能有极高要求的应用程序,直接使用 ByteBuf 可以更好地控制内存分配和数据处理流程。

  5. 低级别数据处理:当您需要处理特定的数据格式,如网络数据包、加密数据等,直接使用 ByteBuf 可以更灵活地进行数据操作。

ChannelFuture

ChannelFuture 是 Netty 中的一个接口,它代表了一个异步操作的未来结果,提供了一种机制来检查异步操作的状态、监听操作的完成以及获取操作的结果。

异步操作:在 Netty 中,许多 I/O 操作是异步的,这意味着操作被发起后立即返回一个 ChannelFuture,而不是等待操作完成。如:Channel.write()Channel.close()Channel.bind() 等方法都是异步的。

总结:

  • ChannelFuture:在 Netty 中代表异步操作的未来结果。
  • 异步操作:发起后立即返回一个 ChannelFuture,而不是等待操作完成。
  • 结果处理
    • sync() 方法:阻塞当前线程直到异步操作完成。
    • addListener() 方法:注册监听器来处理异步操作完成时的事件。
  • 与 AIO 的比较:Netty 提供了一个更高级别的抽象,适用于构建高性能的网络应用;AIO 提供了一个更低级别的 API,适用于需要直接控制 I/O 操作的应用场景。

通过使用 ChannelFuture 和其提供的方法,您可以有效地处理 Netty 中的异步操作。使用 addListener() 方法来处理异步操作的结果是生产环境中推荐的做法,它可以避免阻塞主线程,提高程序的响应性和效率。

高级特性与性能优化

ChannelOption

ChannelOption 是 Netty 中的一个枚举类型,它定义了一系列可以应用于 Channel 的配置选项。这些选项可以用于调整 Channel 的行为,包括 TCP 参数和其他配置,主要通过 BootstrapServerBootstrapoption()childOption() 方法来配置。

Netty 提供了一系列 ChannelOption 枚举值来设置 TCP 参数。下面是一些常用的 TCP 参数及其对应的 ChannelOption 值:

java
// 启用 SO_REUSEADDR 标志,允许在短时间内重新绑定到相同的地址(由IP地址和端口号组成)
// 当一个 Socket 被关闭后,它所绑定的地址通常会进入一个 TIME_WAIT 状态
// (在这段时间内,不启用SO_REUSEADDR则新的Socket无法绑定到相同的地址)
b.option(ChannelOption.SO_REUSEADDR, true);

//  启用TCP保活机制(当网络连接空闲一段时间后,会自动发送探测数据包来确认连接是否仍然活跃)
b.option(ChannelOption.SO_KEEPALIVE, true);

// 设置接收缓冲区为 1MB (操作系统层面的接收缓冲区大小,并非ByteBuf)
b.option(ChannelOption.SO_RCVBUF, 1024 * 1024); 
b.option(ChannelOption.SO_SNDBUF, 1024 * 1024); // 设置发送缓冲区为 1MB

b.option(ChannelOption.SO_LINGER, 10); // 设置关闭连接前等待 10 秒

b.option(ChannelOption.SO_TIMEOUT, 5000); // 设置读取超时时间为 5 秒

b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 设置连接超时时间为 5 秒

b.option(ChannelOption.IP_TOS, 0x10); // 设置 IP 类型服务字段

// 设置多播接口 (仅适用于 IP 多播相关的 Channel)
b.option(ChannelOption.IP_MULTICAST_IF, InetAddress.getByName("0.0.0.0")); 
b.option(ChannelOption.IP_MULTICAST_TTL, 2); // 设置多播 TTL 为 2
b.option(ChannelOption.IP_MULTICAST_LOOP_DISABLED, false); // 禁用多播回环

对于客户端,使用 Bootstrapoption() 方法来设置 ChannelOption

java
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.SO_KEEPALIVE, true)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast(new ClientHandler());
     }
 });

对于服务端,使用 ServerBootstrapoption() 方法来设置 ChannelOption

java
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 128) // 设置监听队列长度
 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置子 Channel 的 SO_KEEPALIVE 选项
 .childOption(ChannelOption.TCP_NODELAY, true) // 设置子 Channel 的 TCP_NODELAY 选项
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast(new ServerHandler());
     }
 });

ChannelGroup

ChannelGroup 是 Netty 中用于管理多个 Channel 的重要工具(实际上是一个接口)。它提供了一种方便的方式来管理多个 Channel。Netty 提供了几个 ChannelGroup 的实现类,包括 DefaultChannelGroupDefaultEventExecutor

  • DefaultChannelGroup:这是最常用的实现类,它使用一个 EventExecutor 来管理 Channel 的生命周期。
  • DefaultEventExecutor:用于执行 ChannelGroup 中的异步操作。

ChannelGroup常用方法

  1. 添加 Channel

    • add(Channel):将一个 Channel 添加到 ChannelGroup 中。
    • add(Iterable<Channel>):将多个 Channel 添加到 ChannelGroup 中。
    java
    channelGroup.add(channel);
    channelGroup.add(Arrays.asList(channel1, channel2, channel3));
  2. 移除 Channel

    • remove(Channel):从 ChannelGroup 中移除一个 Channel
    • remove(Iterable<Channel>):从 ChannelGroup 中移除多个 Channel
    java
    channelGroup.remove(channel);
    channelGroup.remove(Arrays.asList(channel1, channel2));
  3. 关闭 Channel

    • close():关闭 ChannelGroup 中的所有 Channel
    • close(Channel):关闭 ChannelGroup 中的特定 Channel
    java
    channelGroup.close().sync(); // 关闭所有 Channel
    channelGroup.close(channel).sync(); // 关闭特定 Channel
  4. 检查 Channel 是否存在于 ChannelGroup 中

    • contains(Channel):检查 Channel 是否存在于 ChannelGroup 中。
    java
    boolean contains = channelGroup.contains(channel);
  5. 获取 ChannelGroup 中的 Channel 数量

    • size():返回 ChannelGroup 中的 Channel 数量。
    java
    int size = channelGroup.size();
  6. 获取 ChannelGroup 中的所有 Channel

    • channels():返回一个包含所有 Channel 的迭代器。
    java
    Iterator<Channel> iterator = channelGroup.channels();
    while (iterator.hasNext()) {
        Channel channel = iterator.next();
        // 处理每个 Channel
    }
  7. 遍历 ChannelGroup 中的所有 Channel

    • forEach(Consumer<? super Channel>):对 ChannelGroup 中的每个 Channel 执行给定的操作。
    java
    channelGroup.forEach(channel -> {
        // 对每个 Channel 执行操作
    });
  8. 向 ChannelGroup 中的所有 Channel 写入消息

    • write(Object):向 ChannelGroup 中的所有 Channel 写入消息。
    • write(Channel, Object):向 ChannelGroup 中的特定 Channel 写入消息。
    java
    channelGroup.write(message).sync(); // 向所有 Channel 写入消息
    channelGroup.write(channel, message).sync(); // 向特定 Channel 写入消息
  9. 向 ChannelGroup 中的所有 Channel 写入并刷新消息

    • writeAndFlush(Object):向 ChannelGroup 中的所有 Channel 写入消息并立即刷新。
    • writeAndFlush(Channel, Object):向 ChannelGroup 中的特定 Channel 写入消息并立即刷新。
    java
    channelGroup.writeAndFlush(message).sync(); // 向所有 Channel 写入并刷新消息
    channelGroup.writeAndFlush(channel, message).sync(); // 向特定 Channel 写入并刷新消息

在实际开发中,尤其是像聊天室这样的应用场景中,通常会创建多个 ChannelGroup 来管理不同的群组或频道。这样可以更加灵活地管理各个群组中的成员,并针对不同的群组执行特定的操作,如广播消息等。这种方式的优点包括:

  1. 分组管理:可以轻松地将用户分组到不同的聊天室或频道。
  2. 广播消息:可以向特定的群组广播消息,而不影响其他群组。
  3. 权限控制:可以根据群组来实施权限控制,例如只允许管理员发送特定类型的消息。
  4. 资源管理:可以更好地管理资源,例如限制每个群组的最大成员数。

下面是一个简单的示例,展示了如何使用多个 ChannelGroup 来管理聊天室的不同群组:

  • 创建多个 ChannelGroup:在聊天室应用中,每个群组或频道可以对应一个 ChannelGroup
  • 实现:使用 Map<String, DefaultChannelGroup> 来存储每个群组名称和对应的 ChannelGroup
  • 使用:通过 computeIfAbsent 方法来获取或创建 ChannelGroup,通过 channelGroup.add() 添加 Channel,通过 channelGroup.writeAndFlush() 广播消息。
java
public class ChatRoomServer {
    private final ConcurrentHashMap<String, DefaultChannelGroup> channelGroups = 
                                                        new ConcurrentHashMap<>();

    public void startServer(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 128)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new ChatRoomServerHandler(channelGroups));
                 }
             });

            // 绑定端口
            ChannelFuture f = b.bind(port).sync();
            System.out.println("Chat Room Server started and listening for connections on port: " + port);

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup,释放所有资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            // 关闭所有 ChannelGroup 中的 Channel
            for (DefaultChannelGroup group : channelGroups.values()) {
                group.close().sync();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new ChatRoomServer().startServer(port);
    }
}

// ChatRoomServerHandler 类负责处理客户端连接
class ChatRoomServerHandler extends ChannelInboundHandlerAdapter {
    private final ConcurrentHashMap<String, DefaultChannelGroup> channelGroups;

    public ChatRoomServerHandler(ConcurrentHashMap<String, DefaultChannelGroup> channelGroups) {
        this.channelGroups = channelGroups;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("Client connected: " + incoming.remoteAddress());

        // 加入到默认群组
        String defaultGroupName = "default";
        DefaultChannelGroup defaultGroup = channelGroups.computeIfAbsent(defaultGroupName, 
                    s -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        defaultGroup.add(incoming);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 解析消息,假设消息格式为 "join:groupName" 或 "message:text"
        String message = (String) msg;
        if (message.startsWith("join:")) {
            String groupName = message.substring(5);
            DefaultChannelGroup group = channelGroups.computeIfAbsent(groupName, 
                        s -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
            group.add(ctx.channel());
            System.out.println("Client joined group: " + groupName);
        } else {
            // 向当前所在的群组广播消息
            String currentGroupName = getCurrentGroupName(ctx);
            if (currentGroupName != null) {
                DefaultChannelGroup group = channelGroups.get(currentGroupName);
                if (group != null) {
                    group.writeAndFlush(msg);
                }
            }
        }
    }

    private String getCurrentGroupName(ChannelHandlerContext ctx) {
        Channel incoming = ctx.channel();
        for (Map.Entry<String, DefaultChannelGroup> entry : channelGroups.entrySet()) {
            if (entry.getValue().contains(incoming)) {
                return entry.getKey();
            }
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 关闭发生异常的 Channel
        cause.printStackTrace();
        ctx.close();
    }
}

通过使用多个 ChannelGroup,可以更加灵活地管理聊天室中的不同群组,并轻松实现群聊功能。

自定义Codec

在 Netty 中,数据在发送之前需要被编码成二进制形式,而在接收时需要被解码回原始数据类型。因此,编解码器(Codec)就是用来完成这两个过程的组件。

  • 编码器(Encoder):负责将应用程序的数据对象转换为适合在网络上传输的字节流。在发送数据之前,将Java对象转换为字节序列。
  • 解码器(Decoder):将从网络上接收到的字节流转换回应用程序的数据对象。在接收数据之后,将字节序列转换为Java对象。

要自定义一个编码器,通常需要继承 MessageToByteEncoder<T> 类,其中 T 是要编码的数据类型。下面是一个简单的示例:

java
public class CustomEncoder extends MessageToByteEncoder<MyMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {
        // 将 MyMessage 对象编码为 ByteBuf
        out.writeInt(msg.getIntValue()); // 假设 MyMessage 包含一个整数值
        out.writeBytes(msg.getBytes());  // 假设 MyMessage 包含一个字节数组
    }
}

要自定义一个解码器,通常需要继承 ByteToMessageDecoder 类。下面是一个简单的示例:

java
public class CustomDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 检查是否有足够的数据来解码
        if (in.readableBytes() < 4) {
            return; // 如果数据不足,则返回
        }

        // 读取整数值
        int intValue = in.readInt();

        // 读取字节数组
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);

        // 创建 MyMessage 对象
        MyMessage message = new MyMessage(intValue, bytes);

        // 将解码后的消息添加到输出列表
        out.add(message);
    }
}

可以通过 ChannelInitializer 添加自定义的编码器和解码器到 ChannelPipeline

通过自定义编解码器,您可以更灵活地处理复杂的数据结构和协议。

Netty性能优化

使用Netty框架进行开发时,其性能优化主要考虑三个方向:线程模型,零拷贝和内存管理

注:内存泄漏检测级别为 PARANOID(最严格的检测级别),将会记录详细的调用栈信息并在检测到潜在泄漏时打印警告信息。

实际开发中,使用 PooledByteBufAllocator.DEFAULT.directBuffer 是一个很好的选择,特别是在需要处理大量数据和高并发请求的应用中。

  • 性能优化:==PooledByteBufAllocator.DEFAULT.directBuffer 结合了内存池和堆外内存的优点==,适用于需要高性能和低延迟的应用场景。
  • 内存泄漏风险:内存池有助于减少内存泄漏的风险,而且堆外内存可以避免垃圾收集器的影响。
  • 内存效率:堆外内存可以减少内存拷贝,提高 I/O 效率。

Netty实战项目示例