Netty仿写微信即时通讯系统

2019/11/11

Netty是互联网互联网中间件领域使用最广泛最核心的网络通信框架。比如RocketMQ、dubbo、spring5(webflux)和ElasticSearch

思维导图

img

Netty是什么

Netty 是一个异步基于事件驱动的高性能网络通信框架

提供 统一的 IO 读写 API 以及强大的 pipeline 来编写业务处理逻辑

异步体现在那?

一是异步事件处理,Event被放入EventQueue即可返回,后续再从Queue里消费处理;

二是异步IO,包括Bind、Write等操作会返回一个ChannelFuture,进而异步拿到结果,不会造成线程block。

serverBootstrap.bind(port).addListener(future -> {
       if (future.isSuccess()) {
           System.out.println("success, port: " + port);
       } else {
           System.out.println("failed, port: " + port);
       }
   });

概念

Event Netty用细分的events来通知我们状态的变化或者操作的状况

- 打开或者关闭到远端的连接

- 写或者刷数据到一个socket

Channel 是数据的载体 ,ChannelHandler负责Channel中的逻辑处理

-Netty提供了大量你可以马上拿来用的预定义handler,包括HTTP和SSL/TLS等协议的handler。

ChannelPipeline 可以理解为ChannelHandler的容器

ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler

一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来

channelHandler 分为 inBound 和 outBound 两种类型的接口,分别是处理数据读与数据写的逻辑

默认实现ChannelInboundHandlerAdapter

Netty源码解读(三)Channel与Pipeline

传统IO

服务端

serverSocket来监听 8000 端口,然后创建一个线程,线程里面不断调用阻塞方法

  1. serversocket.accept();获取新的连接
  2. 当获取到新的连接之后,给每条连接创建一个新的线程,这个线程负责从该连接中读取数据
  3. 然后读取数据是以字节流的方式

问题

连接过多的时候,I/O模型就不合适了

  1. 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起

  2. 线程切换效率低下:单机 CPU 核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。

  3. 除了以上两个问题,IO 编程中,我们看到数据读写是以字节流为单位。

    为了解决这三个问题,JDK 在 1.4 之后提出了 NIO。

NIO编程

基于IO多路复用技术的“非阻塞同步”IO模型。简单来说,内核将可读可写事件通知应用,由应用主动发起读写操作

线程模型Reactor

reactor:基于NIO技术,可读可写时通知应用;

线程过多解决

NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责

一个线程,一个while死循环怎么检测多个连接的数据的读写?

一条连接来了之后,现在不创建一个 while 死循环去监听是否有数据可读了,而是直接把这条连接注册到 selector 上,然后,通过检查这个 selector,就可以批量监测出有数据可读的连接,进而读取数据,

实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少

IO读写面向流

IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。

Java NIO缺点

  • API复杂

  • 没有线程模型 (Reactor单线程 多线程),就连简单的自定义协议拆包都要你自己实现

    自行实现BUG高,不便于维护

    Reactor 模式

  • JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%

Netty编程

Netty封装了JDK的NIO,更简单,功能更强。

仿微信聊天系统

功能

主要分为单聊和群聊

  1. 客户端登录
  2. 客户端之间收发消息
  3. 群聊的建立
  4. 群聊的成员管理
  5. 群聊成员收发消息
  6. 客户端退出登录

单聊

单聊流程

  1. 登录授权
  2. 保存映射(标识+TCP连接)
  3. 服务器根据数据包里的标识,找到TCP连接发送
  4. 如果对方不在线,需将消息缓存,对方上线后发送

我们把客户端与服务端之间相互通信的数据包称为指令数据包,指令数据包分为指令和数据,每一种指令对应客户端或者服务端的一种操作,数据部分对应的是指令处理需要的数据。

单聊指令

登录请求 登陆响应 客户端发送消息 服务器发送消息 登出请求 登出响应

群聊

群聊流程

  1. 登录授权
  2. 保存映射(标识+TCP连接)
  3. 创建群聊,把各个成员标识表示发送给服务器,服务器保存各个成员标识,并生成聊天室ID
  4. 发送消息时,把聊天室ID发送到服务器,服务器遍历成员标识,找到TCP连接,发送

群聊指令

创建群聊请求 群聊创建成功通知 加入群聊请求 群聊加入通知

发送群聊消息 接收群聊消息 退出群聊请求 退出群聊通知

Netty实战核心

核心知识点

  • 服务端客户端启动
  • 自动重连
  • 数据载体ByteBuf
  • 客户端与服务端通信协议编解码
  • 粘包拆包原理与实践
  • Pipeline 与 channelHandler
  • 心跳与空闲检测

逻辑结构

客户端

image-20201101181857378

服务端

image-20201101182147794

拆包粘包 – 根据协议编码解码 对象二进制对象转化 – 指令处理 响应的逻辑处理器处理

功能实现

服务端启动

创建一个引导类,然后给他指定线程模型,IO模型,连接读写处理逻辑,绑定端口之后,服务端就启动起来了。

public class NettyServer {
    public static void main(String[] args) {
    	//bossGroup表示监听端口,accept 新连接的线程组,
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //workerGroup表示处理每一条连接的数据读写的线程组
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
		//引导类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
        		//线程模型
                .group(bossGroup, workerGroup)
                //IO模型
                .channel(NioServerSocketChannel.class)
                //每条连接数据读写,业务处理逻辑
                //channelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类呢,就是 Netty 对 NIO 类型的连接的抽象,而我们前面NioServerSocketChannel也是对 NIO 类型的连接的抽象,NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                    }
                });

        serverBootstrap.bind(8000);
    }
}

自动绑定递增端口

serverBootstrap.bind(8000);这个方法呢,它是一个异步的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture

服务端启动其他方法

handler() 方法

handler()方法呢,可以和我们前面分析的childHandler()方法对应起来,childHandler()用于指定处理新连接数据的读写处理逻辑,handler()用于指定在服务端启动过程中的一些逻辑,通常情况下呢,我们用不着这个方法。

attr() 方法
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")

attr()方法可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性,然后我们可以通过channel.attr()取出这个属性,其实说白了就是给NioServerSocketChannel维护一个map而已

childAttr() 方法

childAttr可以给每一条连接指定自定义属性,然后后续我们可以通过channel.attr()取出该属性。

childOption() 方法
serverBootstrap
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)

childOption()可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中

  • ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启
  • ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。
option() 方法

除了给每个连接设置这一系列属性之外,我们还可以给服务端channel设置一些属性,最常见的就是so_backlog,如下设置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数

image-20201102013300018

客户段启动

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定线程模型
                .group(workerGroup)
                // 2.指定 IO 类型为 NIO
                .channel(NioSocketChannel.class)
                // 3.IO 处理逻辑
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                    }
                });
        // 4.建立连接
        bootstrap.connect("juejin.im", 80).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("连接成功!");
            } else {
                System.err.println("连接失败!");
            }

        });
    }
}

失败重连

在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重新连接

通常情况下,连接建立失败不会立即重新连接,而是会通过一个指数退避的方式

connect(bootstrap, "juejin.im", 80, MAX_RETRY);

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
    bootstrap.connect(host, port).addListener(future -> {
        if (future.isSuccess()) {
            System.out.println("连接成功!");
        } else if (retry == 0) {
            System.err.println("重试次数已用完,放弃连接!");
        } else {
            // 第几次重连
            int order = (MAX_RETRY - retry) + 1;
            // 本次重连的间隔
            int delay = 1 << order;
            System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
            bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                    .SECONDS);
        }
    });
}

客户端启动其他方法

attr() 方法
bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")

attr() 方法可以给客户端 Channel,也就是NioSocketChannel绑定自定义属性,然后我们可以通过channel.attr()取出这个属性,比如,上面的代码我们指定我们客户端 Channel 的一个clientName属性,属性值为nettyClient,其实说白了就是给NioSocketChannel维护一个 map 而已,后续在这个 NioSocketChannel 通过参数传来传去的时候,就可以通过他来取出设置的属性,非常方便。

option() 方法
Bootstrap
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)

option() 方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中

  • ChannelOption.CONNECT_TIMEOUT_MILLIS 表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
  • ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制,true 为开启
  • ChannelOption.TCP_NODELAY 表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启

客户端与服务端双向通信

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new FirstClientHandler());
    }
});
  • ch.pipeline() 返回的是和这条连接相关的逻辑处理链,采用了责任链模式
  • 然后再调用 addLast() 方法 添加一个逻辑处理器

逻辑处理器继承自 ChannelInboundHandlerAdapter

channelActive()方法,这个方法会在客户端连接建立成功之后被调用

channelRead(),这个方法在接收到客户端发来的数据之后被回调。

public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客户端写出数据");

        // 1. 获取数据
        ByteBuf buffer = getByteBuf(ctx);

        // 2. 写数据
        ctx.channel().writeAndFlush(buffer);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 获取二进制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        
        // 2. 准备数据,指定字符串的字符集为 utf-8
        byte[] bytes = "你好,闪电侠!".getBytes(Charset.forName("utf-8"));

        // 3. 填充数据到 ByteBuf
        buffer.writeBytes(bytes);

        return buffer;
    }
}

ByteBuf

二进制数据的抽象 ByteBuf 的结构,本质上它的原理就是,它引用了一段内存,这段内存可以是堆内也可以是堆外的,然后用引用计数来控制这段内存是否需要被释放,使用读写指针来控制对 ByteBuf 的读写,可以理解为是外观模式的一种使用

只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法

客户端与服务端通信协议编解码

通信协议

无论是使用 Netty 还是原始的 Socket 编程,基于 TCP 通信的数据包格式均为二进制,协议指的就是客户端与服务端事先商量好的,每一个二进制数据包中每一段字节分别代表什么含义的规则。

  • Java 对象按照通信协议转换成二进制数据包
  • 据的传输过程由 TCP/IP 协议负责数据的传输
  • 按照协议取出二进制数据包中的相应字段,包装成 Java 对象

通信协议的设计

image-20201102183935664

魔术 : 任何数据包传递到服务器,服务器都会根据自定义协议来进行处理 可以直接关闭连接节省资源

版本号:通常情况下是预留字段,用于协议升级的时候用到

登录

image-20201102233310395

实现客户端与服务端收发消息

判断客户端是否已登陆

登陆成功后,给Channel绑定一个登陆成功的标识位

    public static void markAsLogin(Channel channel) {
        channel.attr(Attributes.LOGIN).set(true);
    }

    public static boolean hasLogin(Channel channel) {
        Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);

        return loginAttr.get() != null;
    }

pipeline与channelHandler

责任链设计模式来组织代码逻辑,

Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler

image-20201102233542232

Netty 内置的 ChannelHandler

ChannelInboundHandlerAdapter

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

他的作用就是接收上一个 handler 的输出

ChannelOutboundHandlerAdapte

ByteToMessageDecoder(解码器)

过解码器把二进制数据转换到 Java 对象

ByteBuf默认是堆外内存,需要释放。自动释放,可以防止内存泄漏

SimpleChannelInboundHandler

类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可

MessageToByteEncoder

参考

Netty 异步和事件驱动

彻底搞懂 netty 线程模型

Netty 入门与实战:仿写微信 IM 即时通讯系统

Post Directory