Netty是互联网互联网中间件领域使用最广泛最核心的网络通信框架。比如RocketMQ、dubbo、spring5(webflux)和ElasticSearch
思维导图

Netty是什么
Netty 是一个异步基于事件驱动的高性能网络通信框架
提供 统一的 IO 读写 API 以及强大的 pipeline 来编写业务处理逻辑
异步体现在那?
一是异步事件处理,Event被放入EventQueue即可返回,后续再从Queue里消费处理;
二是异步IO,包括Bind、Write等操作会返回一个ChannelFuture,进而异步拿到结果,不会造成线程block。
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("success, port: " + port);
} else {
System.out.println("failed, port: " + port);
}
});
概念
Event Netty用细分的events来通知我们状态的变化或者操作的状况
- 打开或者关闭到远端的连接
- 写或者刷数据到一个socket
Channel 是数据的载体 ,ChannelHandler负责Channel中的逻辑处理
-Netty提供了大量你可以马上拿来用的预定义handler,包括HTTP和SSL/TLS等协议的handler。
ChannelPipeline 可以理解为ChannelHandler的容器
ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler。
一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来
channelHandler 分为 inBound 和 outBound 两种类型的接口,分别是处理数据读与数据写的逻辑
默认实现ChannelInboundHandlerAdapter
传统IO
服务端
serverSocket来监听 8000 端口,然后创建一个线程,线程里面不断调用阻塞方法
serversocket.accept();获取新的连接- 当获取到新的连接之后,给每条连接创建一个新的线程,这个线程负责从该连接中读取数据
- 然后读取数据是以字节流的方式
问题
连接过多的时候,I/O模型就不合适了
-
线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
-
线程切换效率低下:单机 CPU 核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
-
除了以上两个问题,IO 编程中,我们看到数据读写是以字节流为单位。
为了解决这三个问题,JDK 在 1.4 之后提出了 NIO。
NIO编程
基于IO多路复用技术的“非阻塞同步”IO模型。简单来说,内核将可读可写事件通知应用,由应用主动发起读写操作
线程模型Reactor
reactor:基于NIO技术,可读可写时通知应用;
线程过多解决
NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责
一个线程,一个while死循环怎么检测多个连接的数据的读写?
一条连接来了之后,现在不创建一个 while 死循环去监听是否有数据可读了,而是直接把这条连接注册到 selector 上,然后,通过检查这个 selector,就可以批量监测出有数据可读的连接,进而读取数据,
实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少
IO读写面向流
IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。
Java NIO缺点
-
API复杂
-
没有线程模型 (Reactor单线程 多线程),就连简单的自定义协议拆包都要你自己实现
自行实现BUG高,不便于维护
-
JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%
Netty编程
Netty封装了JDK的NIO,更简单,功能更强。
仿微信聊天系统
功能
主要分为单聊和群聊
- 客户端登录
- 客户端之间收发消息
- 群聊的建立
- 群聊的成员管理
- 群聊成员收发消息
- 客户端退出登录
单聊
单聊流程
- 登录授权
- 保存映射(标识+TCP连接)
- 服务器根据数据包里的标识,找到TCP连接发送
- 如果对方不在线,需将消息缓存,对方上线后发送
我们把客户端与服务端之间相互通信的数据包称为指令数据包,指令数据包分为指令和数据,每一种指令对应客户端或者服务端的一种操作,数据部分对应的是指令处理需要的数据。
单聊指令
登录请求 登陆响应 客户端发送消息 服务器发送消息 登出请求 登出响应
群聊
群聊流程
- 登录授权
- 保存映射(标识+TCP连接)
- 创建群聊,把各个成员标识表示发送给服务器,服务器保存各个成员标识,并生成聊天室ID
- 发送消息时,把聊天室ID发送到服务器,服务器遍历成员标识,找到TCP连接,发送
群聊指令
创建群聊请求 群聊创建成功通知 加入群聊请求 群聊加入通知
发送群聊消息 接收群聊消息 退出群聊请求 退出群聊通知
Netty实战核心
核心知识点
- 服务端客户端启动
- 自动重连
- 数据载体ByteBuf
- 客户端与服务端通信协议编解码
- 粘包拆包原理与实践
- Pipeline 与 channelHandler
- 心跳与空闲检测
逻辑结构
客户端

服务端

拆包粘包 – 根据协议编码解码 对象二进制对象转化 – 指令处理 响应的逻辑处理器处理
功能实现
服务端启动
创建一个引导类,然后给他指定线程模型,IO模型,连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
public class NettyServer {
public static void main(String[] args) {
//bossGroup表示监听端口,accept 新连接的线程组,
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup表示处理每一条连接的数据读写的线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
//引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
//线程模型
.group(bossGroup, workerGroup)
//IO模型
.channel(NioServerSocketChannel.class)
//每条连接数据读写,业务处理逻辑
//channelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类呢,就是 Netty 对 NIO 类型的连接的抽象,而我们前面NioServerSocketChannel也是对 NIO 类型的连接的抽象,NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
}
});
serverBootstrap.bind(8000);
}
}
自动绑定递增端口
serverBootstrap.bind(8000);这个方法呢,它是一个异步的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture
服务端启动其他方法
handler() 方法
handler()方法呢,可以和我们前面分析的childHandler()方法对应起来,childHandler()用于指定处理新连接数据的读写处理逻辑,handler()用于指定在服务端启动过程中的一些逻辑,通常情况下呢,我们用不着这个方法。
attr() 方法
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")
attr()方法可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性,然后我们可以通过channel.attr()取出这个属性,其实说白了就是给NioServerSocketChannel维护一个map而已
childAttr() 方法
childAttr可以给每一条连接指定自定义属性,然后后续我们可以通过channel.attr()取出该属性。
childOption() 方法
serverBootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
childOption()可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中
ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。
option() 方法
除了给每个连接设置这一系列属性之外,我们还可以给服务端channel设置一些属性,最常见的就是so_backlog,如下设置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)
表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数

客户段启动
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定线程模型
.group(workerGroup)
// 2.指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
// 3.IO 处理逻辑
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
}
});
// 4.建立连接
bootstrap.connect("juejin.im", 80).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败!");
}
});
}
}
失败重连
在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重新连接
通常情况下,连接建立失败不会立即重新连接,而是会通过一个指数退避的方式
connect(bootstrap, "juejin.im", 80, MAX_RETRY);
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
客户端启动其他方法
attr() 方法
bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")
attr() 方法可以给客户端 Channel,也就是NioSocketChannel绑定自定义属性,然后我们可以通过channel.attr()取出这个属性,比如,上面的代码我们指定我们客户端 Channel 的一个clientName属性,属性值为nettyClient,其实说白了就是给NioSocketChannel维护一个 map 而已,后续在这个 NioSocketChannel 通过参数传来传去的时候,就可以通过他来取出设置的属性,非常方便。
option() 方法
Bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
option() 方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中
ChannelOption.CONNECT_TIMEOUT_MILLIS表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败ChannelOption.SO_KEEPALIVE表示是否开启 TCP 底层心跳机制,true 为开启ChannelOption.TCP_NODELAY表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启
客户端与服务端双向通信
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});
ch.pipeline()返回的是和这条连接相关的逻辑处理链,采用了责任链模式- 然后再调用
addLast()方法 添加一个逻辑处理器
逻辑处理器继承自 ChannelInboundHandlerAdapter
channelActive()方法,这个方法会在客户端连接建立成功之后被调用
channelRead(),这个方法在接收到客户端发来的数据之后被回调。
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端写出数据");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx);
// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准备数据,指定字符串的字符集为 utf-8
byte[] bytes = "你好,闪电侠!".getBytes(Charset.forName("utf-8"));
// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
ByteBuf
二进制数据的抽象 ByteBuf 的结构,本质上它的原理就是,它引用了一段内存,这段内存可以是堆内也可以是堆外的,然后用引用计数来控制这段内存是否需要被释放,使用读写指针来控制对 ByteBuf 的读写,可以理解为是外观模式的一种使用
只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法
客户端与服务端通信协议编解码
通信协议
无论是使用 Netty 还是原始的 Socket 编程,基于 TCP 通信的数据包格式均为二进制,协议指的就是客户端与服务端事先商量好的,每一个二进制数据包中每一段字节分别代表什么含义的规则。
- Java 对象按照通信协议转换成二进制数据包
- 据的传输过程由 TCP/IP 协议负责数据的传输
- 按照协议取出二进制数据包中的相应字段,包装成 Java 对象
通信协议的设计

魔术 : 任何数据包传递到服务器,服务器都会根据自定义协议来进行处理 可以直接关闭连接节省资源
版本号:通常情况下是预留字段,用于协议升级的时候用到
登录

实现客户端与服务端收发消息
判断客户端是否已登陆
登陆成功后,给Channel绑定一个登陆成功的标识位
public static void markAsLogin(Channel channel) {
channel.attr(Attributes.LOGIN).set(true);
}
public static boolean hasLogin(Channel channel) {
Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);
return loginAttr.get() != null;
}
pipeline与channelHandler
责任链设计模式来组织代码逻辑,
Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler

Netty 内置的 ChannelHandler
ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
他的作用就是接收上一个 handler 的输出
ChannelOutboundHandlerAdapte
ByteToMessageDecoder(解码器)
过解码器把二进制数据转换到 Java 对象
ByteBuf默认是堆外内存,需要释放。自动释放,可以防止内存泄漏
SimpleChannelInboundHandler
类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可