netty架构图、api例子

架构图

Boss Group

负责接收客户端的连接

NioEventLoopGroup 类对象,是一个线程池,线程池中的线程是 NioEventLoop

Boss Group 中的 NioEventLoop

BossGroup中有多个 NioEventLoop 线程,每个 NioEventLoop 都绑定了一个 Selector,多个Channel(客户端连接) 注册到 Selector,循环接收 accept 事件(接收客户端连接事件)。并将 channel 注册到 Worker Group 中 NioEventGroupSelector

每个 NioEventLoop 绑定一个端口

也就是说,如果程序只需要监听1个端口的话,只需要有 一个 NioEventLoop 线程就行了。

在Netty的线程模型中,是由多个 Selecotr 在监听IO就绪事件。

Worker Group

轮询处理 read、write 事件

NioEventLoopGroup 类对象,是一个线程池,线程池中的线程是 NioEventLoop

Worker Group 中的 NioEventLoop

一个Channel绑定一个 NioEventLoop,相当于一个连接绑定一个线程,这个连接所有的 ChannelHandler 都是在一个线程中执行的,避免了多线程干扰。

服务器端

ServerBootStrap

ServerBootStrap是Netty提供的一个创建服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类

//创建服务端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();

线程组

netty使用 主从Reactor多线程 ,所以要使用两个线程组:

  • bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
  • workerGroup用于处理每一个连接发生的读写事件。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

group()

//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)

例子

pom.xml 依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

服务端启动类

public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                //设置线程队列得到连接个数    
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置保持活动连接状态    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //使用匿名内部类的形式初始化通道对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器
            System.out.println("服务端已经启动!");
            //绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端处理器

/**
 * 自定义的Handler需要继承Netty规定好的HandlerAdapter
 * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端收到消息,并给你发送消息", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常,关闭通道
        ctx.close();
    }
}

客户端

pom.xml 依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

Bootstrap

Bootstrap是Netty提供的一个创建客户端的工厂类,使用这个工厂类非常便利地创建启动类

//创建客户端对象
Bootstrap bootstrap = new Bootstrap();

线程组

NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

group()

//设置线程组
bootstrap.group(eventExecutors)

例子

客户端启动类

public class MyClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                //设置客户端的通道实现类型    
                .channel(NioSocketChannel.class)
                //使用匿名内部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端启动成功!");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}

客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("土豆土豆,我是茄子!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

测试

先启动服务端,再启动客户端

参考:
https://zhuanlan.zhihu.com/p/181239748
https://zhuanlan.zhihu.com/p/389034303


原文出处:http://malaoshi.top/show_1IX4ungddUTt.html