netty架构图、api例子 作者:马育民 • 2023-02-06 14:53 • 阅读:10076 # 架构图 [](/upload/0/0/1IX4unQ0v1E1.jpg) ## Boss Group 负责接收客户端的连接 是 `NioEventLoopGroup` 类对象,是一个线程池,线程池中的线程是 `NioEventLoop`。 ### Boss Group 中的 NioEventLoop BossGroup中有多个 `NioEventLoop` 线程,每个 `NioEventLoop` 都绑定了一个 `Selector`,多个`Channel`(客户端连接) 注册到 `Selector`,循环接收 `accept` 事件(接收客户端连接事件)。并将 `channel` 注册到 Worker Group 中 `NioEventGroup` 的 `Selector` >每个 `NioEventLoop` **绑定一个端口** > 也就是说,如果程序只需要监听1个端口的话,只需要有 **一个** `NioEventLoop` 线程就行了。 > 在Netty的线程模型中,是由多个 `Selecotr` 在监听IO就绪事件。 ## Worker Group 轮询处理 read、write 事件 是 `NioEventLoopGroup` 类对象,是一个线程池,线程池中的线程是 `NioEventLoop`。 ### Worker Group 中的 NioEventLoop 一个Channel绑定一个 `NioEventLoop`,相当于一个连接绑定一个线程,这个连接所有的 `ChannelHandler` 都是在一个线程中执行的,避免了多线程干扰。 # 服务器端 ### ServerBootStrap ServerBootStrap是Netty提供的一个创建服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类 ``` //创建服务端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); ``` ### 线程组 netty使用 [主从Reactor多线程](https://www.malaoshi.top/show_1IX4ukPvVFlC.html "主从Reactor多线程") ,所以要使用两个线程组: - bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。 - workerGroup用于处理每一个连接发生的读写事件。 ``` EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ``` ### group() ``` //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) ``` ### 例子 #### pom.xml 依赖 ``` io.netty netty-all 4.1.20.Final ``` #### 服务端启动类 ``` public class MyServer { public static void main(String[] args) throws Exception { //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置服务端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //使用匿名内部类的形式初始化通道对象 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置处理器 socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 System.out.println("服务端已经启动!"); //绑定端口号,启动服务端 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` #### 服务端处理器 ``` /** * 自定义的Handler需要继承Netty规定好的HandlerAdapter * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式 **/ public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送过来的消息 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送消息给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("服务端收到消息,并给你发送消息", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 ctx.close(); } } ``` # 客户端 #### pom.xml 依赖 ``` io.netty netty-all 4.1.20.Final ``` ### Bootstrap Bootstrap是Netty提供的一个创建客户端的工厂类,使用这个工厂类非常便利地创建启动类 ``` //创建客户端对象 Bootstrap bootstrap = new Bootstrap(); ``` ### 线程组 ``` NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); ``` ### group() ``` //设置线程组 bootstrap.group(eventExecutors) ``` ### 例子 #### 客户端启动类 ``` public class MyClient { public static void main(String[] args) throws Exception { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { //创建bootstrap对象,配置参数 Bootstrap bootstrap = new Bootstrap(); //设置线程组 bootstrap.group(eventExecutors) //设置客户端的通道实现类型 .channel(NioSocketChannel.class) //使用匿名内部类初始化通道 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加客户端通道的处理器 ch.pipeline().addLast(new MyClientHandler()); } }); System.out.println("客户端启动成功!"); //连接服务端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); //对通道关闭进行监听 channelFuture.channel().closeFuture().sync(); } finally { //关闭线程组 eventExecutors.shutdownGracefully(); } } } ``` #### 客户端处理器 ``` public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送消息到服务端 ctx.writeAndFlush(Unpooled.copiedBuffer("土豆土豆,我是茄子!", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收服务端发送过来的消息 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); } } ``` # 测试 先启动服务端,再启动客户端 参考: https://zhuanlan.zhihu.com/p/181239748 https://zhuanlan.zhihu.com/p/389034303 原文出处:http://malaoshi.top/show_1IX4ungddUTt.html