架构图
Boss Group
负责接收客户端的连接
是 NioEventLoopGroup
类对象,是一个线程池,线程池中的线程是 NioEventLoop
。
Boss Group 中的 NioEventLoop
BossGroup中有多个 NioEventLoop
线程,每个 NioEventLoop
都绑定了一个 Selector
,多个Channel
(客户端连接) 注册到 Selector
,循环接收 accept
事件(接收客户端连接事件)。并将 channel
注册到 Worker Group 中 NioEventGroup
的 Selector
每个
NioEventLoop
绑定一个端口也就是说,如果程序只需要监听1个端口的话,只需要有 一个
NioEventLoop
线程就行了。在Netty的线程模型中,是由多个
Selecotr
在监听IO就绪事件。
Worker Group
轮询处理 read、write 事件
是 NioEventLoopGroup
类对象,是一个线程池,线程池中的线程是 NioEventLoop
。
Worker Group 中的 NioEventLoop
一个Channel绑定一个 NioEventLoop
,相当于一个连接绑定一个线程,这个连接所有的 ChannelHandler
都是在一个线程中执行的,避免了多线程干扰。
服务器端
ServerBootStrap
ServerBootStrap是Netty提供的一个创建服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类
//创建服务端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
线程组
netty使用 主从Reactor多线程 ,所以要使用两个线程组:
- bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
- workerGroup用于处理每一个连接发生的读写事件。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
group()
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
例子
pom.xml 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
服务端启动类
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("服务端已经启动!");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端处理器
/**
* 自定义的Handler需要继承Netty规定好的HandlerAdapter
* 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端收到消息,并给你发送消息", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}
客户端
pom.xml 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
Bootstrap
Bootstrap是Netty提供的一个创建客户端的工厂类,使用这个工厂类非常便利地创建启动类
//创建客户端对象
Bootstrap bootstrap = new Bootstrap();
线程组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
group()
//设置线程组
bootstrap.group(eventExecutors)
例子
客户端启动类
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端通道的处理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端启动成功!");
//连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
客户端处理器
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("土豆土豆,我是茄子!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
测试
先启动服务端,再启动客户端
参考:
https://zhuanlan.zhihu.com/p/181239748
https://zhuanlan.zhihu.com/p/389034303