🌟个人博客:www.hellocode.top
🏰Java知识导航:Java-Navigate
🔥CSDN:HelloCode.
🌞知乎:HelloCode
🌴掘金:HelloCode
⚡如有问题,欢迎指正,一起学习~~
NIO
non-blocking io(new io) 非阻塞的IO
三大组件
Channel & Buffer
Channel:双向的数据传输通道
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer:内存缓冲区
- ByteBuffer:MappedByteBuffer、DirectByteBuffer、HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Selector
多线程版本
在NIO出现之前,服务器处理多客户端连接时采用多线程版设计,每个客户端即为一个socket,服务器会启动一个线程为socket提供服务,每个线程专管一个socket连接
- 内存占用过高
- 线程上下文切换成本高
- 只适合连接数少的场景
线程池版本
通过线程池来复用线程资源,控制最大线程数
- 阻塞模式下,线程仅能处理一个socket连接
- 仅适合短连接场景
Selector版本
配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景。(在线程和连接中添加一个selector层来监控channel上的读写事件)
ByteBuffer
进行文件操作,获取FileChannel,读取文件时,需要有一个暂存缓冲区存放对应内容(ByteBuffer)
ByteBuffer.allocate()
获取Bytebuffer
获取channel:
- 通过输入输出流获取(getChannel方法)
- RandomAccessFile(getChannel方法)
ByteBuffer结构:
类似数组,核心参数:
- capacity:容量
- position:读写指针
- limit:读写限制
在读写模式切换时,其实就是position和limit指针的变换过程,切换后才能读取到正确的数据
clean
转换写模式时,如果有未读取完毕的数据,会直接覆盖compact
意为压缩,转换写模式时,如果有未读取完的数据,会保留,将position移至未读取完的元素后继续写入
常用buffer方法:
allocate(int n)
:为ByteBuffer分配空间(堆内存)- allocateDirect:分配空间(直接内存)
- 堆内存读写效率较低,受到 垃圾回收机制 (GC)的影响
- 直接内存使用的是系统内存,效率会高一些(少一次数据拷贝),不受 GC 影响,分配内存时效率低,使用不当会造成内存泄漏
channel.read(buffer)
:通过channel向buffer写入内容buffer.put((byte) 127)
:buffer自己的写入方法
buffer.get()
:buffer自己的获取字节方法channel.write(buffer)
:从buffer读向channel写- get可以让position指针后移,如果想重复读取数据,可以调用rewind方法将position重新置为0,或者调用
get(int i)
方法获取索引i的内容,它不会移动指针 - mark & reset:mark是做一个标记,记录positiopn位置,reset是跳转到mark记录的位置
hasRemaining()
:是否还有剩余未读数据flip()
:切换为读模式clear()
或compact()
:切换为写模式(清空buffer)
字符串与ByteBuffer互相转换:
字符串转换为ByteBuffer
ByteBuffer.allocate()
:为ByteBuffer分配空间buffer.put("hello".getBytes())
:向buffer填入字节数组
使用Charset
StandardCharsets.UTF_8.encode("hello")
:获得指定字符串的ByteBufferStandardCharsets.UTF_8.decode(Bytebuffer buffer).toString()
:将ByteBuffer转为字符串
wrap
ByteBuffer.wrap(byte[] bytes)
第一种方式在写入完毕后,还是写模式,position指针还是指向末尾;而后两种方法在写入后会自动切换为读模式,将position指向0位置
分散读、集中写
Scattering Reads(分散读取),将一个文件分散读取到多个ByteBuffer中
- 同样是ByteBuffer的read方法
- 传入一个ByteBuffer数组,每个ByteBuffer分配好空间,读满就会把后续内容读入后面的ByteBuffer
Gathering Writes(集中写入),将多个ByteBuffer写入到一个文件中
- 使用ByteBuffer的write方法传入一个BufferByte数组
减少数据在ByteBuffer之间的数据拷贝
黏包、半包分析
网络上多条数据发送给服务器时,假设数据使用 \n
进行分割
Hello,word\n
I am Zhangsan\n
How are you\n
可能在接收时,被进行了重新组合,如下:
Hello,word\nI am Zhangsan\nHo
w are you\n
这就是出现了黏包和半包问题
public static void split(ByteBuffer source){
source.flip();
for(int i = 0; i < source.limit(); i++){
// 找到一条完整消息
if(source.get(i) == '\n'){
// 计算消息长度
int length = i + 1 - source.position();
ByteBuffer target = ByteBuffer.allocate(length);
for(int j = 0; j < length; j++){
target.put(source.get());
}
// 打印target
}
}
source.compact();
}
文件编程
FileChannel
FileChannel智能工作在阻塞模式下
获取:
- 通过FileInputStream的getchannel获取,只能读
- 通过FileOutputStream的getchannel获取,只能写
- 通过RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
读取:
channel.read
:读取到的数据暂存到ByteBuffer中- 返回值代表读取到的字节数,-1表示读取到末尾了
写入:
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式
while(buffer.hasRemaining()){
channel.write(buffer);
}
write不能保证一次将buffer中的内容全部写入channel,FileChannel可以,但是SocketChannel不行,推荐上面的规范写法
关闭:
推荐使用try-with-source关闭
位置:
- 获取当前位置:
channel.position()
设置当前位置:
channel.position(newPos)
- 如果设置为文件的末尾,读取时会返回-1
- 设置为文件的末尾,写入会追加数据
- 如果超过了文件末尾,再写入时会有空洞(00)
大小:
使用size方法获取文件的大小
强制写入:
操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,可以调用 force(true)
方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
两个Channel传输数据
channel.transferTo(起始位置,传输的数据量,目标channel)
- 一次最多传输2G的数据,如果数据过大,可以多次传输
使用了操作系统的零拷贝进行优化,效率高
Path
JDK7 引入了 Path 和 Paths 类
- Path 用来表示文件路径
- Paths 是工具类,用来获取Path实例
Path source = Paths.get("1.txt"); // 相对路径,使用user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt"); // 绝对路径,代表了 d:\1.txt
Path source = Paths.get("d:/1.txt"); // 绝对路径,代表了 d:\1.txt
Path source = Paths.get("d:\\data","projects"); // 代表了 d:\data\projects
.
代表了当前路径..
代表了上一级路径
Files
- 检查文件是否存在:
Files.exists(Path)
创建一级目录:
Files.createDirectory(Path)
- 如果目录已经存在,抛异常
- 不能一次创建多级目录,否则会抛异常
- 创建多级目录:
Files.createDircetories(Path)
拷贝文件:
Files.copy(Path source,Path target)
- 如果文件已存在,会抛异常
- 如果希望source覆盖掉target,需要用 StandardCopyOption 来控制:
Files.copy(source, target, StandardCoptyOption.REPLACE_EXISTING)
移动文件:
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE)
- StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性
- 删除文件:
Files.delete(target)
,文件不存在,抛异常 - 删除目录:
Files.delete(target)
,如果目录还有内容,抛异常 遍历目录
使用
Files.walkFileTree(Path 起始目录, SimpleFileVistor<> visitor)
重写匿名内部类的方法完成遍历
删除操作执行后,被删除的文件不会进入回收站
网络编程
非阻塞 VS 阻塞
阻塞模式
/**
* @blog: <a href="https://www.hellocode.top">...</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-19 15:10
* @Description: TODO
*/
public class Server {
public static void main(String[] args) throws IOException {
// 使用nio,单线程
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ByteBuffer buffer = ByteBuffer.allocate(10);
// 2. 指定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 建立客户端连接,SocketChannel与客户端之间通信
List<SocketChannel> channels = new ArrayList<>();
while (true){
System.out.println("Connecting ...");
SocketChannel sc = ssc.accept(); // 阻塞,直到有客户端连接
channels.add(sc);
for(SocketChannel channel : channels){
System.out.println("Before Read ...");
channel.read(buffer); // 阻塞,直到客户端发送了数据
buffer.flip();
while(buffer.hasRemaining()){
System.out.print((char) buffer.get());
}
buffer.clear();
System.out.println("After Read ...");
}
}
}
}
/**
* @blog: https://www.hellocode.top
* @Author: HelloCode.
* @CreateTime: 2023-10-19 15:27
* @Description: TODO
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
System.out.println("waiting....");
}
}
阻塞模式下,很多方法都会导致线程阻塞
非阻塞模式
- 在服务器绑定端口前,通过
serverSocketChannel.configureBlocking(false)
可以设置非阻塞模式,在该配置下,accept
建立客户端连接时即为非阻塞,线程会继续向下运行,如果没有连接建立,accept
方法的返回值为null - SocketChannel也可以设置为非阻塞模式,方法名一致,配置非阻塞后当使用
channel.read()
时,也为非阻塞模式,如果没有读到数据,read返回0
非阻塞模式通过while-true循环,可以让单线程监控多个连接,但是会导致CPU空转,太过繁忙
Selector方式实现非阻塞
- 创建selector,管理多个channel:
Selector.open()
将channel注册到selector中:
channel.register(selector, 关注事件, 附件)
,返回SelectionKey- SelectionKey,事件发生后可以通过它知道事件和哪个channel的事件
事件类型:
accept:会在有连接请求时触发
connect:在客户端连接后触发
read:可读事件,表示有数据到了
write:可写事件
- 附件一般即为各自channel的附属Buffer
- SelectionKey对象调用
interestOps()
方法设置感兴趣的事件;ServerSocketChannel一般只需要对accept处理,ServerSocket一般对read和write处理
Selector何时不阻塞:
- 事件发生时
- 调用
selector.wakeup()
- 调用
selector.close()
- selector所在线程interrupt
public static void main(String[] args) throws IOException {
// 创建selector,管理多个channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 将ssc注册到selector
SelectionKey sscKey = ssc.register(selector, 0, null);
// 声明关注事件(accept)
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
while(true){
// 调用selector的select方法,没有实现非阻塞,当有事件发生时,线程才会恢复运行
selector.select();
// 处理事件,selectionKeys集合内部包含了所有发生的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
System.out.println(key);
// 区分事件类型
if (key.isAcceptable()) {
// accept事件
ServerSocketChannel channel =(ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
System.out.println(sc);
} else if (key.isReadable()) {
try {
// read事件,读取数据
SocketChannel channel = (SocketChannel) key.channel();
// 正常断开时,read返回值为-1
int read = channel.read(buffer);
if(read == -1){
// 正常断开时,需要手动取消事件
key.cancel();
}else{
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
}
} catch (IOException e) {
// 当客户端关闭时(断开连接),会走read事件
// 需要手动取消对应的key
e.printStackTrace();
key.cancel();
}
}
// 处理完后删除对应的key(重要)
iterator.remove();
}
}
}
- Select在事件未处理时不会阻塞(使用cancle取消事件也可以),不能置之不理
- Selector会在发生事件后,向selectedKeys中添加key,但是不会自动删除,在处理完后需要我们手动删除
- 当客户端断开连接时,会触发read事件,我们需要通过try...catch处理异常,手动取消事件(cancel),从Selector的key集合中真正的删除 key
- 此外,当客户端通过close方法断开时,不会触发catch中的代码,但是当断开时触发的read事件,read方法返回值为-1
处理消息的边界
当ByteBuffer分配的内存不足以一次读取完客户端发送的数据时,会分多次进行读取,可能会出现半包问题(乱码)
解决方法
- 一种思想是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思想是按分隔符拆分,缺点是效率低
TLV格式,即 Type类型、Length长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer 需要提前分配,如果内容过大,则影响server吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
public static void split(ByteBuffer source){
source.flip();
for(int i = 0; i < source.limit(); i++){
// 找到一条完整消息
if(source.get(i) == '\n'){
// 计算消息长度
int length = i + 1 - source.position();
ByteBuffer target = ByteBuffer.allocate(length);
for(int j = 0; j < length; j++){
target.put(source.get());
}
target.flip();
// 打印target
System.out.println(Charset.defaultCharset().decode(target));
}
}
source.compact();
}
public static void main(String[] args) throws IOException {
// 创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 将ssc注册到selector
SelectionKey sscKey = ssc.register(selector, 0, null);
// 声明关注事件(accept)
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
while(true){
// 调用selector的select方法,没有实现非阻塞,当有事件发生时,线程才会恢复运行
selector.select();
// 处理事件,selectionKeys集合内部包含了所有发生的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
System.out.println(key);
// 区分事件类型
if (key.isAcceptable()) {
// accept事件
ServerSocketChannel channel =(ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
// 将Buffer作为附件关联到SelectionKey 上
ByteBuffer buffer = ByteBuffer.allocate(5);
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
System.out.println(sc);
} else if (key.isReadable()) {
try {
// read事件,读取数据
SocketChannel channel = (SocketChannel) key.channel();
// 获取 SelectionKey 附件中的 Buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);
if(read == -1){
key.cancel();
}else{
split(buffer);
if(buffer.position() == buffer.limit()){
// buffer满了,需要扩容了
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);
buffer.flip();
newBuffer.put(buffer);
//替换附件
key.attach(newBuffer);
}
}
} catch (IOException e) {
// 当客户端关闭时(断开连接),会走read事件
// 需要手动取消对应的key
e.printStackTrace();
key.cancel();
}
}
// 处理完后删除对应的key(重要)
iterator.remove();
}
}
}
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("hello\nword\n"));
System.out.println("waiting....");
}
}
ByteBuffer大小分配
- 每个channel 都需要记录可能被切分的消息,因为ByteBuffer 不是线程安全的,因此需要为每个 channel 维护一个独立的 ByteBuffer
ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb的话,要支持百万连接就需要1 TB内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的 buffer,例如4k,如果发现数据不够,再分配 8k 的buffer,将4k buffer 内容拷贝至 8 k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
- 另一种思想是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
写入内容过多问题
/**
* @blog: https://www.hellocode.top
* @Author: HelloCode.
* @CreateTime: 2023-10-19 18:22
* @Description: TODO
*/
public class WriterServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
while (buffer.hasRemaining()){
int write = sc.write(buffer);
System.out.println(write);
}
}
}
}
}
}
/**
* @blog: https://www.hellocode.top
* @Author: HelloCode.
* @CreateTime: 2023-10-19 18:27
* @Description: TODO
*/
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
// 接收数据
int count = 0;
while(true){
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}
在发送很多数据时,一次发送不完,就会一直循环发送,反复尝试直到写完,不符合非阻塞模式
配合可写事件改进:
/**
* @blog: https://www.hellocode.top
* @Author: HelloCode.
* @CreateTime: 2023-10-19 18:22
* @Description: TODO
*/
public class WriterServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
// 判断是否有剩余内容
if(buffer.hasRemaining()){
// 关注一个可写事件(可能原来还关注了其他事件)
// 通过相加可以不覆盖原来关注的事件
// scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
// 把未写完的数据挂到sckey上
scKey.attach(buffer);
}
}else if(key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
int write = channel.write(buffer);
System.out.println(write);
// 清理操作
if(!buffer.hasRemaining()){
// 清除Buffer
key.attach(null);
// 取消可写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
多线程优化
现在都是多核CPU,设计时要充分考虑别让cpu 的力量被白白浪费
分两组选择器
- 单线程配一个选择器,专门处理accept事件
- 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-20 14:36
* @Description: TODO
*/
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT, null);
// 创建固定数量的worker
// Runtime.getRuntime().availableProcessors() 获取cpu核心数
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors() + 1];
for (int i = 0; i < 2; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while(true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 关联 selector
// 轮询算法,负载均衡
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name){
this.name = name;
}
// 初始化线程和Selector
public void register(SocketChannel sc) throws IOException {
if(!start){
thread = new Thread(this,name);
selector = Selector.open();
thread.start();
start = true;
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
selector.wakeup(); // 唤醒select方法
}
@Override
public void run() {
// 监控读写事件
while(true){
try {
selector.select();
Runnable task = queue.poll();
if(task != null){
task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
如何拿到CPU个数
Runtime.getRuntime().availableProcessors()
- 如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理CPU个数,而不是容器申请时的个数
- 这个问题到jdk 10 才修复,使用jvm 参数 UseContainerSupport 配置,默认开启
概念剖析
NIO vs BIO
stream vs channel
- stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
- stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络channel 可以配合 selector 实现多路复用
- 二者均为全双工,即读写可以同时进行
IO 模型
同步阻塞、同步非阻塞、(同步)多路复用、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)
多路复用是同步的
异步都是非阻塞的
当调用一次 channel.read
或 stream.read
后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段
- 阻塞 IO
- 非阻塞 IO
- 多路复用
- 信号驱动
- 异步 IO
零拷贝
传统的IO将一个文件通过socket 写出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.Tength()];
file.read(buf);
Socket socket =...;
socket.getOutputStream().write(buf);
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel) 的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 CPU 完成文件 IO
- 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
- 调用 write 方法,这时将数据从用户缓冲区(bytel[] buf)写入 socket 缓冲区,cpu 会参与拷贝
- 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能
力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 lO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统
来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
NIO 优化
通过DirectByteBuffer
ByteBuffer.allocate(10)
-> HeapByteBuffer:使用的还是Java 内存ByteBuffer.allocateDirect(10)
-> DirectByteBuffer:使用的是操作系统内存(操作系统和Java程序共享,都可以访问)
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuffer 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
java 中的 DirectByteBuffer 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用transferTo/transferFrom
方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到:
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux2.4)
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将内核缓冲区的数据写入网卡,不会使用 cpu
整个过程
- 仅仅只发生了一次用户态与内核态的切换
- 数据拷贝了 2 次
- 所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 ivm 内存中
零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
上述的 transferTo 和 sendFile 都是零拷贝,并不是一次拷贝都没有,而是不会在Java中再进行数据拷贝
AIO
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows 系统通过 IOCP 实现了真正的异步 IO
- Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
文件 AIO
先来看看 AsynchronousFileChannel
@Slf4j
public class AioDemo1 {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});
} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+
可以看到
- 响应文件读取成功的是另一个线程 Thread-5
- 主线程并没有 IO 操作阻塞
守护线程
默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read()
以避免守护线程意外结束
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s closen", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s readn", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connectedn", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}
Nettey入门
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
这里的异步并不是指异步 IO ,Netty的 IO 模型还是基于 IO 多路复用的
Netty 在 Java 网络应用框架中的地位就类似于 Spring 在 JavaEE 开发的地位
Netty vs NIO
- NIO 工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- Netty 对 API 进行增强,使之更易使用
Hello World
- 加入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
- 服务器代码
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-22 11:03
* @Description: TODO
*/
public class HelloServer {
public static void main(String[] args) {
// 启动器,负责组装netty 组件
new ServerBootstrap()
// 添加EventLoop组:NioEventLoopGroup(selector、thread)
.group(new NioEventLoopGroup())
// 选择一个服务器的ServerSocketChannel的实现
.channel(NioServerSocketChannel.class)
// boss负责处理连接,worker负责处理读写
// childHandler 就是负责指明 类似于 worker(child) 需要负责处理的事情(具体逻辑)
.childHandler(
// channel 代表和客户端进行数据读写的通道
// Initializer 初始化,负责添加其他 Handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加具体的 Handler
nioSocketChannel.pipeline().addLast(new StringDecoder()); // 负责解码的Handler
// 自定义的 Handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override // 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印上一步转换好的字符串
System.out.println(msg);
}
});
}
})
// 绑定监听端口
.bind(8080);
}
}
- 客户端
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-22 11:15
* @Description: TODO
*/
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 启动类
new Bootstrap()
// 添加组件
.group(new NioEventLoopGroup())
// 选择客户端channel 实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加字符串编码器
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接到服务器
.connect(new InetSocketAddress("localhost",8080))
// 阻塞方法,直到连接建立才向下执行
.sync()
// 代表连接对象
.channel()
// 向服务器发送数据
.writeAndFlush("hello world");
}
}
- channel可以理解为数据通道
- msg理解为流动的数据,最开始是ByteBuf,但经过pipeline(流水线)的加工,会变成其他类型的对象,最后输出又变成了ByteBuf
handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline,pipeline负责发布时间(读、读取完成…)传播给每个handler
- handler对自己感兴趣的事件进行处理(重写了相应时间的处理方法)
- handler分Inbound(入站)和Outbound(出站)
eventLoop理解为处理数据的工人
- 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底
- 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以对方多个channel的待处理任务,任务分为普通任务、定时任务
- 工人按照pipeline顺序,依此按照handler的规划处理数据,可以为每道工序指定不同的工人
组件
EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
常用方法
普通任务 & 定时任务
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-22 12:11
* @Description: TODO
*/
public class TestEventLoop {
public static void main(String[] args) {
// EventLoopGroup 继承了线程池,拥有线程池的一些方法
// 可以处理 IO 事件、普通任务、定时任务(空参构造默认线程数是 CPU 核心数 * 2,可以指定)
// System.out.println(NettyRuntime.availableProcessors());
EventLoopGroup group = new NioEventLoopGroup();
// 可以处理 普通任务、定时任务
// EventLoopGroup group = new DefaultEventLoop();
// 常用方法
// 获取下一个事件循环对象
System.out.println(group.next());
// 执行普通任务
group.next().submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
}
});
// 执行定时任务
group.next().scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + ":ok");
}, 0, 1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName());
}
}
IO 事件
public static void main(String[] args) {
// IO 事件
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
可以看到两个工人轮流处理channel,但工人与 channel 之间进行了绑定
分工细化
netty还是推荐我们将 eventLoop划分的细一些,类似前面的 boss 和 worker
细分1:
- 在group方法中,可以传递两个EventLoop参数,一个负责accept事件,另一个负责读写事件
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
细分2:
- 当Handler中的事件耗时较长时,可能会影响NIO 线程的工作,此时还需细分
- 创建一个独立的EventLoopGroup(不需要处理IO 事件,DefaultEventLoopGroup即可)
- 在
pipeline.addLast()
方法中,可以传递三个参数:group、handler 的 name、handler - 当有多个handler时,需要交给下一handler处理时,调用
super.channelRead()
等方法
public static void main(String[] args) {
// IO 事件
EventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(group,"handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
// 将消息传递给下一个handler
ctx.fireChannelRead(msg);
}
}).addLast("handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
handler 执行中如何换人
关键代码:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor(); // 返回下一个handler 的 EventLoop
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
如果两个 handler 绑定的是同一个线程,那么就直接调用;否则,就把要调用的代码封装为一个任务对象,由下一个handler 的线程来调用
Channel
主要作用:
close()
可以用来关闭 channelcloseFuture()
用来处理channel 的关闭- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
pipeline()
方法添加处理器write()
方法将数据写入writeAndFlush()
方法将数据写入并刷出
ChannelFuture
方法一:使用sync同步结果
- 在sync处阻塞等待(直到nio线程连接建立完毕)
public static void main(String[] args) throws InterruptedException {
// 启动类
// 带有 Future、Promise 的类型 都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
// 添加组件
.group(new NioEventLoopGroup())
// 选择客户端channel 实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加字符串编码器
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接到服务器
// 异步非阻塞 main发起了调用,真正执行 connect 的是nio 线程
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
// 如果没有sync,直接过去channel对话,连接还没有建立好,就无法发送数据
Channel channel = channelFuture.channel();
// 向服务器发送数据
channel.writeAndFlush("hello");
}
方法二:使用 addListener(回调对象) 方法异步处理结果
public static void main(String[] args) throws InterruptedException {
// 启动类
ChannelFuture channelFuture = new Bootstrap()
// 添加组件
.group(new NioEventLoopGroup())
// 选择客户端channel 实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加字符串编码器
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
// 添加回调方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 当 nio 线程连接建立完成 之后调用该方法
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello");
}
});
}
关闭问题
- 调用
channel.close()
关闭channel时,也是异步操作,会交给nio 线程来关闭channel - 类似的,如果有的操作是需要在关闭之后进行操作,有同步和异步两种方法
- 需要使用
channel.closeFuture()
获取ChannelFuture 对象
同步处理关闭
- 调用
future.sync()
方法,进行同步阻塞 - 当channel 成功关闭时会解除阻塞
异步处理关闭
- 调用
future.addListener()
方法,传递回调参数 - 当nio 线程执行完关闭操作后,会调用对应的回调方法
当 channel 关闭时,对应的 Java 客户端程序并没有结束运行,是因为 NioEventLoopGroup 里面还有一些线程,并没有被结束
优雅关闭
- 在关闭 channel 连接后,调用 NioEventLoopGroup 中的
shutdownGracefully()
方法 - 不是立刻停止,会把手头的工作处理完后停止
服务器和客户端一样,在需要关闭时,不能忽略 NioEventLoopGroup 中资源的释放
为什么要异步
- 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加
- 合理进行任务拆分,也是利用异步的关键
- 提高的是单位时间内处理请求的个数(吞吐量)
Future & Promise
在异步处理时,经常用到这两个接口
首先要说明 Netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。
- jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
- netty Promise:不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
先搞清楚顺序,服务端:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-23 21:17
* @Description: TODO
*/
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
// 通过 channel 拿到 pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 向pipeline 添加处理器
// netty 会默认添加一个 head <-> tail(双向链表)
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
// 将数据传递给下一个 handler
// 内部就是 ctx.fireChannelRead(msg);
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("3");
super.channelRead(ctx, msg);
// 只有有写出的代码时,才会执行出站 handler
socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
// 出站 handler
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("6");
super.write(ctx, msg, promise);
}
});
// head <-> h1 <-> h2 <-> h3 <-> h4 <-> h5 <-> h6 <-> tail
}
})
.bind(8080);
}
}
1
2
3
6
5
4
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 在出站处理器调用时,如果是使用
channel.write()
,则是从tail 往前找 出站处理器;如果使用的ctx.write()
,则是从当前 handler 往前找出站 handler
EmbeddedChannel
- 用来测试的channel
- 可以绑定多个 handler,在测试时就不需要去启动服务端和客户端了
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-23 21:41
* @Description: TODO
*/
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模拟出站
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
}
ByteBuf
ByteBufAllocator.DEFAULT.buffer()
:创建一个默认的ByteBuf(池化基于直接内存的 ByteBuf)- 默认容量256,可以传参指定
- 支持动态扩容
优势
- 池化-可以重用 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如:slice、duplicate、CompositeByteBuf
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buffer);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes());
System.out.println(buffer);
}
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
直接内存 vs 堆内存
- 基于堆:
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
- 基于直接内存:
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 默认使用直接内存
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled | pooled}
- 4.1 之后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
组成
- 容量
- 最大容量:默认是整数最大值
- 读指针
- 写指针
最开始读写指针都在 0 位置
在 NIO 中读写共用一个指针,需要切换读写模式,Netty 采用双指针,简化操作
常用方法
写入:
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01/00 代表 true/false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian(大端),即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian(小端),即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是 Big Endian(大端)
先写入 4 个字节:
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
再写进一个int整数,也就是四个字节
buffer.writeInt(5);
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置。
buffer.setByte(4,1);
扩容
再写进一个整数时,容量就不够了(初始容量为10),这个时候就会引发扩容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
log(buffer);
buffer.writeInt(6);
log(buffer);
具体的扩容规则:
- 如果写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
- 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10 = 1024(2^9=512 已经不够了)
- 扩容不能超过 max capacity 会报错
读取:
例如读了 4 次,每次一个字节
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
log(buffer);
buffer.writeInt(6);
log(buffer);
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
这时要重复读取的话,重置到标记位置 reset
buffer.resetReaderIndex();
log(buffer);
System.out.println(buffer.readInt());
log(buffer);
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index
内存释放
retain & release
由于堆外内存并不直接控制于JVM,因此只能等到full GC的时候才能垃圾回收
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
不是我们想象的(一般情况下)
ByteBuf buf = ...
try {
...
} finally {
buf.release();
}
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下:
- 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
slice
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
原始 ByteBuf 进行一些初始操作
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
如果原始 ByteBuf 再次读操作(又读了一个字节)
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
这时的 slice 不受影响,因为它有独立的读写指针
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
如果 slice 的内容发生了更改
slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05 |... |
+--------+-------------------------------------------------+----------------+
这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存
System.out.println(ByteBufUtil.prettyHexDump(origin));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05 |.. |
+--------+-------------------------------------------------+----------------+
duplicate
【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
CompositeByteBuf
【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
有两个 ByteBuf 如下
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?
方法1:
ByteBuf buf3 = ByteBufAllocator.DEFAULT
.buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
这种方法不太好,因为进行了数据的内存复制操作
方法2:
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
Unpooled
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
也可以用来包装普通字节数组,底层也不会有拷贝操作
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
class io.netty.buffer.CompositeByteBuf
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 |...... |
+--------+-------------------------------------------------+----------------+
双向通信
练习
实现一个 echo server
public class TestEchoService {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug((String) msg);
ch.writeAndFlush(msg);
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
}
客户端
public class TestEchoClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.channel(NioSocketChannel.class)
.group(group)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug((String) msg);
super.channelRead(ctx, msg);
}
});
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
}
}
读和写的误解
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
public class TestServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888);
Socket s = ss.accept();
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
// 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
客户端:
public class TestClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("localhost", 8888);
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
Nettey进阶
粘包与半包
TCP 传输中,客户端发送数据,实际是把数据写入到了 TCP 的缓存中,粘包和半包也就会在此时产生。客户端给服务端发送了两条消息ABC和DEF,服务端这边的接收会有多少种情况呢?
有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三条消息AB、CD、EF。
粘包现象
- 上面所说的一次性收到了所有的消息ABCDEF,类似于粘包。
- 如果客户端发送的包的大小比 TCP 的缓存容量小,并且 TCP 缓存可以存放多个包,那么客户端和服务端的一次通信就可能传递了多个包,这时候服务端从 TCP 缓存就可能一下读取了多个包,这种现象就叫粘包。
服务端
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 13:58
* @Description: TODO
*/
public class HelloWorldServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(boss,worker)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(((ByteBuf)msg).toString());
}
});
}
});
ChannelFuture future = serverBootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 14:03
* @Description: TODO
*/
public class HelloWorldClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// 在channel 建立成功后, 触发 avtive 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(buf);
}
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.channel().closeFuture().sync();
}catch (Exception e){
System.out.println("client error:" + e);
}finally {
worker.shutdownGracefully();
}
}
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 160, cap: 1024)
上面代码一次性发送了160 的数据,按理说应该分10次,每次16,这就是粘包现象
半包现象
- 上面说的后面那种收到了三条消息AB、CD、EF,类似于半包。
- 如果客户端发送的包的大小比 TCP 的缓存容量大,那么这个数据包就会被分成多个包,通过 Socket 多次发送到服务端,服务端第一次从接受缓存里面获取的数据,实际是整个包的一部分,这时候就产生了半包(半包不是说只收到了全包的一半,是说收到了全包的一部分)。
在服务端加上一下配置,将接收缓冲区调小一些
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
对应的打印信息如下:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 36, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 512)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 512)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 496)
第一次出现了粘包(16正常,36粘包)
从第2次开始出现半包现象
在使用 TCP 协议时都会有粘包和半包问题,UDP没有该问题
TCP 滑动窗口
- TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
MSS限制
链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
- 以太网的 MTU 是 1500
- FDDI(光纤分布式数据接口)的 MTU 是 4352
- 本地回环地址的 MTU 是 65535 - 本地测试不走网卡
- MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
- ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
- TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
- MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
解决方法
方法一:短链接
将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。这样的方法就是十分简单,不需要在我们的应用中做过多修改。但缺点也就很明显了,效率低下,TCP连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能。 因此,并不推荐这种方式。
- 客户端在一次消息发送完毕后断开连接
能解决粘包问题,不能解决半包问题
- 调整系统的接收缓冲区(滑动窗口):
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)
调整netty 的接收缓冲区(bytebuf):
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16))
默认 1024;三个参数分别代表最小值、初始值、最大值
- 系统接收缓冲区是全局的,netty 的缓冲区是针对 channel 的
- 调整系统的接收缓冲区(滑动窗口):
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 14:03
* @Description: TODO
*/
public class HelloWorldClient {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// 在channel 建立成功后, 触发 avtive 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(buf);
ctx.channel().close();
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.channel().closeFuture().sync();
}catch (Exception e){
System.out.println("client error:" + e);
}finally {
worker.shutdownGracefully();
}
}
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
方法二:封装成帧
定长解码器
- 在服务端添加 handler:
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
(表示固定长度为10) - 固定长度 这种方式下,消息边界也就是固定长度即可。 优点就是实现很简单,缺点就是空间有极大的浪费,如果传递的消息中大部分都比较短,这样就会有很多空间是浪费的。 因此,这种方式一般也是不推介的。
行解码器(分隔符)
netty 中提供了解码器 handler,用来实现行解码器
LineBasedFrameDecoder
:指定最大长度,以换行符为分割(\n
或者\r\n
)DelimiterBasedFrameDecoder
:指定最大长度和分隔符
- 需要指定最大长度,也就是到了最大长度还没有找到分隔符,就会抛一个异常(避免消息本身格式不对而无限寻找下去)
- 分隔符 这种方式下,消息边界也就是分隔符本身。优点是空间不再浪费,实现也比较简单。缺点是当内容本身出现分割符时需要转义,所以无论是发送还是接受,都需要进行整个内容的扫描。 因此,这种方式效率也不是很高,但可以尝试使用。
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 14:03
* @Description: TODO
*/
public class Client3 {
public static void main(String[] args) {
send();
}
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// 在channel 建立成功后, 触发 avtive 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
StringBuilder sb = makeString(c, r.nextInt(256) + 1);
buf.writeBytes(sb.toString().getBytes());
System.out.println(sb.toString());
c++;
}
ctx.writeAndFlush(buf);
System.out.println("finish ...");
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.channel().closeFuture().sync();
}catch (Exception e){
System.out.println("client error:" + e);
}finally {
worker.shutdownGracefully();
}
}
private static StringBuilder makeString(char ch, int len){
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
sb.append(ch);
}
sb.append("\n");
return sb;
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 13:58
* @Description: TODO
*/
public class Server3 {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(boss,worker)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 行解码器(最大长度约定为10,分隔符为换行符)
nioSocketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ByteBuf buf = (ByteBuf) msg;
System.out.println(((ByteBuf) msg).toString(Charset.defaultCharset()));
}
});
}
});
// serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
ChannelFuture future = serverBootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
LTC解码器(基于长度字段的帧解码器)
netty 提供了
LengthFieldBasedFrameDecoder
解码器- maxFrameLength:帧的最大长度(超过该长度还没发现分割标准就失败,防止格式不正确)
- lengthFieldOffset:长度字段偏移量(长度这个字段从哪里开始)
- lengthFieldLength:长度字段长度
- lengthAdjustment:长度字段为基准,跳过几个字节到内容(除了长度和内容,可能还有header 这种附加内容)
- initialBytesToStrip:从头剥离几个字节(如果解析后的内容不需要长度等字段,可以剥离)
- 专门的 length 字段 这种方式,就有点类似 Http 请求中的 Content-Length,有一个专门的字段存储消息的长度。作为服务端,接受消息时,先解析固定长度的字段(length字段)获取消息总长度,然后读取后续内容。优点是精确定位用户数据,内容也不用转义。缺点是长度理论上有限制,需要提前限制可能的最大长度从而定义长度占用字节数。 因此,十分推介用这种方式。
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 16:33
* @Description: TODO
*/
public class TestLengthFieldDecoder {
public static void main(String[] args) {
// 4字节长度 1字节版本号 内容
EmbeddedChannel embeddedChannel = new EmbeddedChannel(
// 保留 版本号、内容(剥离了长度)
new LengthFieldBasedFrameDecoder(
1024,
0,
4,
1,
4),
new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(((ByteBuf)msg).toString(Charset.defaultCharset()));
}
}
);
// 4字节长度 实际内容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer,"Hello World");
send(buffer,"Hi");
embeddedChannel.writeInbound(buffer);
}
private static void send(ByteBuf buf,String content) {
byte[] bytes = content.getBytes(); // 实际内容
int length = bytes.length; // 实际内容长度
buf.writeInt(length); // int 4字节
buf.writeByte(1); // 版本号
buf.writeBytes(bytes);
}
}
协议设计与解析
Redis
- set key value
- Redis中把命令看成一个数组,首先要求传递一个数组长度
- 后续发送每个命令或者键值的长度
*3
$3
set
$4
name
$8
zhangsan
Java代码根据Redis 协议规则向Redis 服务器发送请求并接收响应:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 17:27
* @Description: TODO
*/
public class TestRedis {
public static void main(String[] args) {
// 换行的ASCII码:\n \r\n
final byte[] LINE = {13, 10};
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(group)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向Redis 发送命令
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
});
// 接收 Redis 服务器的响应
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", 6379);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully();
}
}
}
只要你按照对应的协议去编写内容,就可以和相应协议进行通信;Netty 提供了很多现成的协议,比如 Redis、Http等
HTTP
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-25 17:39
* @Description: TODO
*/
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// Netty 提供的 HTTP 协议编解码器(组合了编码和解码器)
// 在命名上,一般是Codec 就同时包括编码和解码
// 在HttpServerCodec解析后,会分为 HttpRequest和 HttpContent 两部分
// 既是入站处理器也是出站处理器
socketChannel.pipeline().addLast(new HttpServerCodec());
/* 处理方式一:
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof HttpRequest){
// 处理请求行,请求头
}else if(msg instanceof HttpContent){
// 请求体
}
}
});
*/
// 处理方式二:
// SimpleChannelInboundHandler:只会处理感兴趣的消息(泛型)
// 该处理器会跳过其他类型的消息
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
System.out.println(httpRequest.uri());
// 返回响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
httpRequest.protocolVersion(), // 协议版本
HttpResponseStatus.OK
);
byte[] bytes = "<h1>Hello, World !</h1>".getBytes();
// 响应头
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,bytes.length);
// 响应内容
response.content().writeBytes(bytes);
channelHandlerContext.writeAndFlush(response);
}
});
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
自定义协议
要素:
- 魔数,用来在第一时间判定是否是无效的数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊......跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
应用场景:聊天室
- 定义一个编解码器,需要继承
ByteTiMessageCodec
类,泛型表示需要将ByteBuf转换为什么类型 重写
encode
和decode
方法- encode:编码,将 自定义消息转换为 ByteBuf
- decode:解码,将 ByteBuf 转换为 自定义消息
Message(抽象父类,定义消息协议基本信息)
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 13:59
* @Description: TODO
*/
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType){
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
// 登录业务
public static final int LOGIN_REQUEST_MESSAGE = 0;
public static final int LOGIN_RESPONSE_MESSAGE = 1;
// 聊天业务
public static final int CHAT_REQUEST_MESSAGE = 2;
public static final int CHAT_RESPONSE_MESSAGE = 3;
// 群组业务
public static final int GROUP_CREATE_REQUEST_MESSAGE = 4;
public static final int GROUP_CREATE_RESPONSE_MESSAGE = 5;
public static final int GROUP_JOIN_REQUEST_MESSAGE = 6;
public static final int GROUP_JOIN_RESPONSE_MESSAGE = 7;
public static final int GROUP_QUIT_REQUEST_MESSAGE = 8;
public static final int GROUP_QUIT_RESPONSE_MESSAGE = 9;
public static final int GROUP_CHAT_REQUEST_MESSAGE = 10;
public static final int GROUP_CHAT_RESPONSE_MESSAGE = 11;
public static final int GROUP_MEMBERS_REQUEST_MESSAGE = 12;
public static final int GROUP_MEMBERS_RESPONSE_MESSAGE = 13;
private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
}
具体子类(根据业务类型定义)
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 14:11
* @Description: TODO
*/
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
private String username;
private String password;
private String nickname;
public LoginRequestMessage(){
}
public LoginRequestMessage(String username, String password, String nickname){
this.username = username;
this.password = password;
this.nickname = nickname;
}
@Override
public int getMessageType() {
return LOGIN_REQUEST_MESSAGE;
}
}
自定义Codec,指定具体编解码方法:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 13:59
* @Description: TODO
*/
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 编码器
// 将自定义消息包装到形参中的 ByteBuf 即可
// 1. 魔数(我们定义为4字节的 1234)
byteBuf.writeBytes(new byte[]{1,2,3,4});
// 2. 1字节的版本
byteBuf.writeByte(1);
// 3. 1字节的序列化算法(为了简单,采用jdk)
// 为了可扩展:0代表jdk方式、1 代表 json
byteBuf.writeByte(0);
// 4. 指令类型(和业务相关),比如:登录消息、注册消息、单聊、群聊等消息(1字节)
// 通过抽象父类 Message 定义了指令类型,由子类具体声明
byteBuf.writeByte(message.getMessageType());
// 5. 4字节请求序号
byteBuf.writeInt(message.getSequenceId());
byteBuf.writeByte(0xff); // 仅仅为了对齐填充(使协议长度为 2 的整数倍)
// 6. 获取消息内容的字节数组(父类 Message 实现了 Serializable,可以被序列化)
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
// 将message 消息写入对象输出流,对象输出流将对应消息写入 ByteArrayOutputStream
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
// 7. 正文长度
byteBuf.writeInt(bytes.length);
// 8. 正文内容
byteBuf.writeBytes(bytes);
// 自定义协议:4 + 1 + 1 + 1 + 4 + 正文内容 = 15 + 正文内容
// 为了内容对其,需要是 2 的整数倍,因此在第 5 步后加 1 字节无意义的内容填充为 16 字节
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 解码器(和编码是逆过程)
// 和编码规定的字节数 一一对应
// 1. 魔数
int magicNum = byteBuf.readInt();
// 2. 版本
byte version = byteBuf.readByte();
// 3. 序列化方式
byte serializerType = byteBuf.readByte();
// 4. 指令类型
byte messageType = byteBuf.readByte();
// 5. 指令序号
int sequenceId = byteBuf.readInt();
// 跳过无意义的填充字节
byteBuf.readByte();
// 正文长度
int length = byteBuf.readInt();
// 读取正文内容
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);
// 反序列化
if(serializerType == 0){
// JDK 序列化方式
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将解码对象存到List中(Netty 规定,便于后续 handler 使用)
list.add(message);
// 打印测试
System.out.println(magicNum + ":" + version + ":" + serializerType + ":" + messageType
+ ":" + sequenceId + ":" + length);
System.out.println(message);
}
}
}
测试:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 14:33
* @Description: TODO
*/
@Slf4j
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new MessageCodec());
// 测试 encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan","123","张三");
channel.writeOutbound(message);
// 测试 decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// 将 message 中的内容通过编码传输到 buf 中(buf 中的数据即为编码后的数据)
new MessageCodec().encode(null,message,buf);
// 入站
channel.writeInbound(buf);
}
}
15:16:43,944 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] WRITE: 257B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2e 74 6f 70 2e 68 65 6c 6c |....sr..top.hell|
|00000020| 6f 63 6f 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 |ocode.chat.messa|
|00000030| 67 65 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d |ge.LoginRequestM|
|00000040| 65 73 73 61 67 65 34 b8 1c 84 de 68 cc 48 02 00 |essage4....h.H..|
|00000050| 03 4c 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c |.L..nicknamet..L|
|00000060| 6a 61 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 |java/lang/String|
|00000070| 3b 4c 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 |;L..passwordq.~.|
|00000080| 01 4c 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 |.L..usernameq.~.|
|00000090| 01 78 72 00 22 74 6f 70 2e 68 65 6c 6c 6f 63 6f |.xr."top.helloco|
|000000a0| 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e |de.chat.message.|
|000000b0| 4d 65 73 73 61 67 65 45 13 8f 99 de 7c a1 0a 02 |MessageE....|...|
|000000c0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|000000d0| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|000000e0| 00 00 00 00 00 00 00 74 00 06 e5 bc a0 e4 b8 89 |.......t........|
|000000f0| 74 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 |t..123t..zhangsa|
|00000100| 6e |n |
+--------+-------------------------------------------------+----------------+
15:16:43,945 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] FLUSH
15:16:43,948 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] READ: 257B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2e 74 6f 70 2e 68 65 6c 6c |....sr..top.hell|
|00000020| 6f 63 6f 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 |ocode.chat.messa|
|00000030| 67 65 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d |ge.LoginRequestM|
|00000040| 65 73 73 61 67 65 34 b8 1c 84 de 68 cc 48 02 00 |essage4....h.H..|
|00000050| 03 4c 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c |.L..nicknamet..L|
|00000060| 6a 61 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 |java/lang/String|
|00000070| 3b 4c 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 |;L..passwordq.~.|
|00000080| 01 4c 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 |.L..usernameq.~.|
|00000090| 01 78 72 00 22 74 6f 70 2e 68 65 6c 6c 6f 63 6f |.xr."top.helloco|
|000000a0| 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e |de.chat.message.|
|000000b0| 4d 65 73 73 61 67 65 45 13 8f 99 de 7c a1 0a 02 |MessageE....|...|
|000000c0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|000000d0| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|000000e0| 00 00 00 00 00 00 00 74 00 06 e5 bc a0 e4 b8 89 |.......t........|
|000000f0| 74 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 |t..123t..zhangsa|
|00000100| 6e |n |
+--------+-------------------------------------------------+----------------+
16909060:1:0:0:0:241
LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123, nickname=张三)
15:16:43,983 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
此时还有一个问题,可能会出现粘包 或者 半包 的问题
可以配合 LengthFieldBasedFrameDecoder
解码器解决问题
改进:
EmbeddedChannel channel = new EmbeddedChannel(
// 最大长度1024,正文长度偏移量12,4个字节,不需要调整,保留所有内容
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCodec());
如果出现半包问题,没有加帧解码器,就有可能出现异常,比如读到了正文长度是255,然后去读255个正文内容,但是内容不完整,就有异常
加上帧解码器后,判断如果消息不完整,则不会向后续 handler 传递,会等待消息
Sharable
如果要将Handler 抽取出来,复用,应该怎么做
如果直接抽取一个变量,多次使用
- 对于会保存内容的,如:帧解码器(半包时会记录内容,等待后续消息),在多线程下,就可能出现数据混乱的问题
- 对于不会保存内容的,如:LoggingHandler,只打印日志,不记录数据,就不会出现问题
在 Netty 中,通过 @Sharable
注解作为标记,能够被多线程使用的类,会使用该注解进行标记说明:
如果我们的自定义 Handler 需要添加 @Sharable
注解,标识可共享,步骤如下:
- 首先,
ByteToMessageCodec
类不允许子类添加@Sharable
注解,它认为 ByteBuf 到 Message 可能会出现半包、粘包问题,因此需要保存状态信息,即不能被多线程共享 - 因此,可以使用
MessageToMessageCodec
类,它认为 从 Message 到 Message,即已经接收到完整的消息了,不用再保存状态信息,因此可以添加@Sharable
注解,被多线程共享
聊天室案例
包结构:
Message
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 13:59
* @Description: TODO
*/
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType){
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
// 登录业务
public static final int LOGIN_REQUEST_MESSAGE = 0;
public static final int LOGIN_RESPONSE_MESSAGE = 1;
// 聊天业务
public static final int CHAT_REQUEST_MESSAGE = 2;
public static final int CHAT_RESPONSE_MESSAGE = 3;
// 群组业务
public static final int GROUP_CREATE_REQUEST_MESSAGE = 4;
public static final int GROUP_CREATE_RESPONSE_MESSAGE = 5;
public static final int GROUP_JOIN_REQUEST_MESSAGE = 6;
public static final int GROUP_JOIN_RESPONSE_MESSAGE = 7;
public static final int GROUP_QUIT_REQUEST_MESSAGE = 8;
public static final int GROUP_QUIT_RESPONSE_MESSAGE = 9;
public static final int GROUP_CHAT_REQUEST_MESSAGE = 10;
public static final int GROUP_CHAT_RESPONSE_MESSAGE = 11;
public static final int GROUP_MEMBERS_REQUEST_MESSAGE = 12;
public static final int GROUP_MEMBERS_RESPONSE_MESSAGE = 13;
// 心跳
public static final int PING_MESSAGE = 14;
public static final int PONG_MESSAGE = 15;
private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 14:11
* @Description: TODO
*/
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
private String username;
private String password;
public LoginRequestMessage(){
}
public LoginRequestMessage(String username, String password){
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LOGIN_REQUEST_MESSAGE;
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 18:02
* @Description: TODO
*/
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage{
public LoginResponseMessage(boolean success, String reason){
super(success, reason);
}
@Override
public int getMessageType() {
return LOGIN_RESPONSE_MESSAGE;
}
}
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
@Override
public int getMessageType() {
return CHAT_REQUEST_MESSAGE;
}
}
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return CHAT_RESPONSE_MESSAGE;
}
}
......
protocol
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 15:43
* @Description: TODO
*/
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception {
// 编码器
// 将自定义消息包装到形参中的 ByteBuf 即可
ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
// 1. 魔数(我们定义为4字节的 1234)
byteBuf.writeBytes(new byte[]{1,2,3,4});
// 2. 1字节的版本
byteBuf.writeByte(1);
// 3. 1字节的序列化算法(为了简单,采用jdk)
// 为了可扩展:0代表jdk方式、1 代表 json
byteBuf.writeByte(0);
// 4. 指令类型(和业务相关),比如:登录消息、注册消息、单聊、群聊等消息(1字节)
// 通过抽象父类 Message 定义了指令类型,由子类具体声明
byteBuf.writeByte(message.getMessageType());
// 5. 4字节请求序号
byteBuf.writeInt(message.getSequenceId());
byteBuf.writeByte(0xff); // 仅仅为了对齐填充(使协议长度为 2 的整数倍)
// 6. 获取消息内容的字节数组(父类 Message 实现了 Serializable,可以被序列化)
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
// 将message 消息写入对象输出流,对象输出流将对应消息写入 ByteArrayOutputStream
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
// 7. 正文长度
byteBuf.writeInt(bytes.length);
// 8. 正文内容
byteBuf.writeBytes(bytes);
list.add(byteBuf);
// 自定义协议:4 + 1 + 1 + 1 + 4 + 正文内容 = 15 + 正文内容
// 为了内容对其,需要是 2 的整数倍,因此在第 5 步后加 1 字节无意义的内容填充为 16 字节
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 解码器(和编码是逆过程)
// 和编码规定的字节数 一一对应
// 1. 魔数
int magicNum = byteBuf.readInt();
// 2. 版本
byte version = byteBuf.readByte();
// 3. 序列化方式
byte serializerType = byteBuf.readByte();
// 4. 指令类型
byte messageType = byteBuf.readByte();
// 5. 指令序号
int sequenceId = byteBuf.readInt();
// 跳过无意义的填充字节
byteBuf.readByte();
// 正文长度
int length = byteBuf.readInt();
// 读取正文内容
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);
// 反序列化
if(serializerType == 0){
// JDK 序列化方式
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将解码对象存到List中(Netty 规定,便于后续 handler 使用)
list.add(message);
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 17:33
* @Description: 封装Decoder
*/
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder(){
this(1024,12,4,0,0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
Server
handler:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 21:31
* @Description: TODO
*/
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
String to = chatRequestMessage.getTo();
String from = chatRequestMessage.getFrom();
// 获取到接收者的 channel
Channel channel = SessionFactory.getSession().getChannel(to);
if(channel != null){
// 对方在线,可以发送消息
channel.writeAndFlush(new ChatResponseMessage(from, chatRequestMessage.getContent()));
}else{
// 对方不在线
channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"对方用户不存在或不在线"));
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 21:42
* @Description: 群聊处理器
*/
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
String groupName = groupChatRequestMessage.getGroupName();
String from = groupChatRequestMessage.getFrom();
String content = groupChatRequestMessage.getContent();
List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(from,content));
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 21:43
* @Description: 创建群聊处理器
*/
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
String groupName = groupCreateRequestMessage.getGroupName();
Set<String> members = groupCreateRequestMessage.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if(group == null){
// 创建成功
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true,groupName + "创建成功"));
// 向对应用户发送拉群消息
// 获取在线的群成员
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
for (Channel channel : membersChannel) {
// 向群组每位成员通知进群消息
channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入" + groupName));
}
}else{
// 创建失败
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false,groupName + "创建失败,群名不能重复"));
}
}
}
/**
* @author: HelloCode.
* @date: 2023/10/29 21:28
* @description: 登录处理器
*/
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
UserService userService = UserServiceFactory.getUserService();
boolean login = userService.login(username, password);
// 响应消息
LoginResponseMessage resp;
if (login) {
// 登录成功
resp = new LoginResponseMessage(true, "登录成功");
// 保存Channel 信息
SessionFactory.getSession().bind(channelHandlerContext.channel(), username);
} else {
resp = new LoginResponseMessage(false, "用户名或密码错误");
}
channelHandlerContext.writeAndFlush(resp);
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 22:12
* @Description: 用户退出处理器
*/
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
@Override
/* 连接断开时会触发(正常断开,执行quit命令) */
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经断开",ctx.channel());
}
@Override
/* 连接断开时会触发(异常断开,捕捉到异常时会触发,比如客户端直接点击按钮终止程序) */
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 异常断开,异常是 {}",ctx.channel(),cause.getMessage());
}
}
......
service:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 18:01
* @Description: UserService工厂类,提升扩展性
*/
public class UserServiceFactory {
public static UserService getUserService(){
return new UserServiceMemoryImpl();
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:20
* @Description: 用户管理接口
*/
public interface UserService {
/**
* @author: HelloCode.
* @date: 2023/10/29 16:21
* @param: username
* @param: password
* @return: boolean
* @description: 登录成功返回 true,失败返回 false
*/
boolean login(String username, String password);
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:22
* @Description: TODO
*/
public class UserServiceMemoryImpl implements UserService{
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan","123");
allUserMap.put("lisi","123");
allUserMap.put("wangwu","123");
allUserMap.put("zhaoliu","123");
allUserMap.put("qianqi","123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if(pass == null){
return false;
}
return pass.equals(password);
}
}
......
session:
public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:24
* @Description: 会话管理接口
*/
public interface Session {
/**
* @author: HelloCode.
* @date: 2023/10/29 16:24
* @param: channel
* @param: username
* @return: void
* @description: 绑定会话
*/
void bind(Channel channel, String username);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:25
* @param: channel
* @return: void
* @description: 解绑会话
*/
void unbind(Channel channel);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:26
* @param: channel
* @param: name
* @return: java.lang.Object
* @description: 获取属性
*/
Object getAttribute(Channel channel, String name);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:27
* @param: channel
* @param: name
* @param: value
* @return: void
* @description: 设置属性
*/
void setAttribute(Channel channel, String name, Object value);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:27
* @param: username
* @return: java.nio.channels.Channel
* @description: 根据用户名获取 Channel
*/
Channel getChannel(String username);
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:28
* @Description: TODO
*/
public class SessionMemoryImpl implements Session{
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username,channel);
channelUsernameMap.put(channel,username);
channelAttributesMap.put(channel,new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
@Override
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name,value);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:33
* @Description: 群组业务
*/
@Data
public class Group {
// 聊天室名称
private String name;
// 聊天室成员
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 16:32
* @Description: 聊天组会话管理接口
*/
public interface GroupSession {
/**
* @author: HelloCode.
* @date: 2023/10/29 16:33
* @param: name
* @param: members
* @return: top.hellocode.chat.server.session.Group
* @description: 创建一个聊天组,如果不存在则创建成功,否则返回组对象 (名字不能重复)
*/
Group createGroup(String name, Set<String> members);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:34
* @param: name
* @param: member
* @return: top.hellocode.chat.server.session.Group
* @description: 加入聊天组,不存在返回null,否则返回组对象
*/
Group joinMember(String name, String member);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:36
* @param: name
* @param: member
* @return: top.hellocode.chat.server.session.Group
* @description: 移除组成员,组不存在返回null,否则返回组对象
*/
Group removeMember(String name, String member);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:36
* @param: name
* @return: top.hellocode.chat.server.session.Group
* @description: 移除聊天组,组不存在返回null,否则返回组对象
*/
Group removeGroup(String name);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:37
* @param: name
* @return: java.util.Set<java.lang.String>
* @description: 获取组成员,没有成员则返回空集合
*/
Set<String> getMembers(String name);
/**
* @author: HelloCode.
* @date: 2023/10/29 16:38
* @param: name
* @return: java.util.List<java.nio.channels.Channel>
* @description: 获取组成员的 channel 集合,只有在线的 channel 才会返回
*/
List<Channel> getMembersChannel(String name);
}
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
//判断群聊存不存在
if(groupMap.get(name) == null){
return null;
}
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 17:28
* @Description: TODO
*/
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler loginHandler = new LoginRequestMessageHandler();
ChatRequestMessageHandler chatHandler = new ChatRequestMessageHandler();
GroupCreateRequestMessageHandler groupCreateHandler = new GroupCreateRequestMessageHandler();
GroupJoinRequestMessageHandler groupJoinHandler = new GroupJoinRequestMessageHandler();
GroupMembersRequestMessageHandler groupMembersHandler = new GroupMembersRequestMessageHandler();
GroupQuitRequestMessageHandler groupQuitHandler = new GroupQuitRequestMessageHandler();
GroupChatRequestMessageHandler groupChatHandler = new GroupChatRequestMessageHandler();
QuitHandler quitHandler = new QuitHandler();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
// 5s 内如果没有收到 channel 的数据,就会触发一个事件
.addLast(new IdleStateHandler(5, 0, 0))
// 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
.addLast(new ChannelDuplexHandler(){
@Override
// 用来触发特殊事件(一些自定义事件)
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 触发了读空闲事件
log.debug("已经 5 秒没有读到数据了......");
ctx.channel().close();
}
}
})
.addLast(new ProcotolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_CODEC);
// 自定义处理器(使用SimpleChannelInboundHandler,只针对泛型指定的消息做处理)
nioSocketChannel.pipeline()
// 登录业务
.addLast(loginHandler)
// 聊天业务
.addLast(chatHandler)
// 群聊业务
.addLast(groupCreateHandler)
.addLast(groupJoinHandler)
.addLast(groupMembersHandler)
.addLast(groupQuitHandler)
.addLast(groupChatHandler)
// 退出处理
.addLast(quitHandler);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
}catch (Exception e){
log.error("server err",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
......
client:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 17:25
* @Description: TODO
*/
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 用于线程之间通信
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 登录状态标记
AtomicBoolean LOGIN = new AtomicBoolean(false);
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(group)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProcotolFrameDecoder())
.addLast(MESSAGE_CODEC)
// 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
// 3s 内如果没有向服务器写数据,就会触发一个写空闲事件(写的频率要比服务器读时间间隔短)
.addLast(new IdleStateHandler(0, 3, 0))
// 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
.addLast(new ChannelDuplexHandler(){
@Override
// 用来触发特殊事件(一些自定义事件)
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// 触发了写空闲事件
// log.debug("3s 没有写数据了,发送一个心跳包......");
ctx.writeAndFlush(new PingMessage());
}
}
});
// 创建自定义 Handler(业务相关)
socketChannel.pipeline().addLast("client handler",
new ChannelInboundHandlerAdapter(){
@Override
// 连接建立后触发
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 负责接收用户在控制台的输入,向服务器发送各种消息
// 创建新线程,防止用户输入阻塞 NIO 线程
new Thread(() -> {
Scanner sc = new Scanner(System.in);
System.out.print("请输入用户名:");
String username = sc.nextLine();
System.out.print("请输入密码:");
String password = sc.nextLine();
// 构造消息对象进行登录(省略校验部分)
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送消息(入站处理器,写入之后就会进行出站操作)
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
// 线程同步操作,等待登录响应通知
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (!LOGIN.get()) {
// 登录失败,释放资源
ctx.channel().close();
return;
}
// 登录成功
while (true){
// 菜单
System.out.println("===============================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("===============================");
// 接收命令
String command = sc.nextLine();
// 解析命令(省略非法命令校验,假设命令都是正确格式)
String[] s = command.split(" ");
switch (s[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));
break;
case "gcreate":
Set<String> members = new HashSet<>(Arrays.asList(s[2].split(",")));
// 加入自己
members.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],members));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));
break;
case "quit":
ctx.channel().close();
return;
default:
System.out.println("指令输入错误,请重试");
break;
}
}
},"system in").start();
}
// 接收响应消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}", msg);
if(msg instanceof LoginResponseMessage){
// 是登录响应
LoginResponseMessage resp = (LoginResponseMessage) msg;
if (resp.isSuccess()) {
// 设置登录标记
LOGIN.compareAndSet(false,true);
}
// 已经接收到响应,计数器减一,唤醒System in 线程
WAIT_FOR_LOGIN.countDown();
}
}
// 连接断开时触发
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("连接已断开,按任意键退出...");
ctx.channel().close();
}
// 出现异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.debug("连接异常:{},按任意键退出",cause.getMessage());
ctx.channel().close();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
}catch (Exception e){
log.error("client error",e);
}finally {
group.shutdownGracefully();
}
}
}
连接假死
原因:
- 网络设备出现故障,例如网卡、机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
问题:
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
解决:
- Netty 提供了空闲状态检测器来解决该问题:
IdleStateHandler
- 参数:读最大空闲时间、写最大空闲时间、读写最大空闲时间
指定的最大空闲时间内没有收到或者写出数据,就会触发相应事件(事件类型为
IdleStateEvent
)IdleState.READER_IDLE
:读空闲事件IdleState.WRITER_IDLE
:写空闲事件IdleState.ALL_IDLE
:都空闲事件
nioSocketChannel.pipeline()
// 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
// 5s 内如果没有收到 channel 的数据,就会触发一个事件
.addLast(new IdleStateHandler(5, 0, 0))
// 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
.addLast(new ChannelDuplexHandler(){
@Override
// 用来触发特殊事件(一些自定义事件)
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 触发了读空闲事件
log.debug("已经 5 秒没有读到数据了......");
}
}
});
心跳机制
- 可以配合心跳机制解决连接假死问题
- 让客户端每 3 秒自动向服务器发送一个心跳包来打破 5 秒限制
- 这样可以防止正常用户被服务端断开的现象
socketChannel.pipeline()
// 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
// 3s 内如果没有向服务器写数据,就会触发一个写空闲事件(写的频率要比服务器读时间间隔短)
.addLast(new IdleStateHandler(0, 3, 0))
// 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
.addLast(new ChannelDuplexHandler(){
@Override
// 用来触发特殊事件(一些自定义事件)
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// 触发了写空闲事件
// log.debug("3s 没有写数据了,发送一个心跳包......");
ctx.writeAndFlush(new PingMessage());
}
}
});
场景参数及优化
扩展序列化算法
序列化、反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[] 或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
参数
连接超时
CONNECT_TIMEOUT_MILLIS
- 属于 SocketChannel 参数
- 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
- SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
客户端:
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
// 参数配置
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300)
.handler(new LoggingHandler());
服务端:
Bootstrap bootstrap = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
// ServerSocket参数配置
.option()
// Socket参数配置
.childOption()
.handler(new LoggingHandler());
SO_BACKLOG
- 属于 ServerSocketChannel
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
- 在 Linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
sync queue
-半连接队列- 大小通过
/proc/sys/net/ipv4/tcp_max_syn_backlog
指定,在syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过
accept queue
-全连接队列- 其大小通过
/proc/sys/net/core/somaxconn
指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值 - 如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
- 其大小通过
Bootstrap bootstrap = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
// ServerSocket参数配置
.option(ChannelOption.SO_BACKLOG, 200) // 全连接队列大小
.handler(new LoggingHandler());
在 Netty 中,accept 的能力很强,只有在 accept 处理不了的时候,才会在队列中堆积;如果全连接队列满了,就会抛出异常
ulimit -n
- 属于操作系统参数
- 允许一个进程打开的最大文件(描述符)数量
TCP_NODELAY
- 属于 SocketChannel 参数
- 使用 childOption 进行设置
- nagle 算法的开关,Netty 中默认开启(数据量小的时候,等待多个凑成大的再统一发送,可能造成延迟)
- 建议设置为 true,不延迟
SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 属于 SocketChannel 参数
- SO_RCVBUF 既可用于 SocketChannel 参数,也可以用于 ServerSocketChannel 参数(建议设置到 ServerSocketChannel 上)
- 决定了滑动窗口的上限,建议不要手动调整,由操作系统动态调整
ALLOCATOR
- 属于 SocketChannel 参数
- 用来分配 ByteBuf
- 调用
ctx.alloc()
就会拿到 ALLOCATOR 分配器对象
RCVBUF_ALLOCATOR
- 属于 SocketChannel 参数
- 控制 Netty 接收缓冲区(ByteBuf)大小和是否是 Direct 内存
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
RPC 框架
为了简化起见,在原来聊天项目的基础上新增 RPC 请求和响应消息
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 13:59
* @Description: TODO
*/
@Data
public abstract class Message implements Serializable {
// 其他内容省略.....
// RPC 消息
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
static {
// 其他内容省略.....
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
请求消息:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 14:11
* @Description: TODO
*/
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message{
// 调用的接口全限定名,服务端根据它找到实现
private String interfaceName;
// 调用接口中的方法名
private String methodName;
// 方法返回类型
private Class<?> returnType;
// 方法参数类型数组
private Class[] parameterTypes;
// 方法参数数值数组
private Object[] parameterValues;
public RpcRequestMessage(){
}
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName,
Class<?> returnType, Class[] parameterTypes, Object[] parameterValues){
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValues = parameterValues;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
响应消息:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-29 14:11
* @Description: TODO
*/
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message{
// 返回值
private Object returnValue;
// 异常值
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
服务端:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 21:32
* @Description: TODO
*/
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable codec = new MessageCodecSharable();
// rpc 消息处理器,待实现
RpcRequestMessageHandler rpcHandler = new RpcRequestMessageHandler();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ProcotolFrameDecoder())
.addLast(loggingHandler)
.addLast(codec)
.addLast(rpcHandler);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
}catch (Exception e){
log.error("sever error...",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 22:03
* @Description: TODO
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponseMessage rpcResponseMessage) throws Exception {
log.debug("{}",rpcResponseMessage);
}
}
客户端:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 21:37
* @Description: TODO
*/
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable codec = new MessageCodecSharable();
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler rpcHandler = new RpcResponseMessageHandler();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProcotolFrameDecoder())
.addLast(loggingHandler)
.addLast(codec)
.addLast(rpcHandler);
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
// 发送 RPC 消息
RpcRequestMessage message = new RpcRequestMessage(
1,
"top.hellocode.chat.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
);
channel.writeAndFlush(message);
channel.closeFuture().sync();
}catch (Exception e){
log.error("client error...",e);
}finally {
group.shutdownGracefully();
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 21:41
* @Description: Rpc 请求处理器
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequestMessage message){
RpcResponseMessage resp = new RpcResponseMessage();
try {
// 获取接口实现类
HelloService service = (HelloService) ServicesFactory.getService(
Class.forName(message.getInterfaceName())
);
// 获取方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
// 执行方法
Object ret = method.invoke(service, message.getParameterValues());
// 构造 Response
resp.setReturnValue(ret);
} catch (Exception e) {
resp.setExceptionValue(e);
throw new RuntimeException(e);
}
channelHandlerContext.writeAndFlush(resp);
}
// 反射测试
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
RpcRequestMessage message = new RpcRequestMessage(
1,
"top.hellocode.chat.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
);
// 获取接口实现类
HelloService service = (HelloService) ServicesFactory.getService(
Class.forName(message.getInterfaceName())
);
// 获取方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
// 执行方法
Object ret = method.invoke(service, message.getParameterValues());
System.out.println(ret);
}
}
在使用 Gson 序列化和反序列化 Class 类型时(如 String.class等),会出现异常,需要自己指定序列化和反序列化方式:
static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>>{
@Override
public Class<?> deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
// json -> class
try {
return Class.forName(jsonElement.getAsString());
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override
public JsonElement serialize(Class<?> aClass, Type type, JsonSerializationContext jsonSerializationContext) {
// class -> json
return new JsonPrimitive(aClass.getName());
}
}
使用:
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
优化
RpcClientManager:
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 21:37
* @Description: TODO
*/
@Slf4j
public class RpcClientManager {
private static Channel channel = null;
// 测试
public static void main(String[] args) {
HelloService service = getProxyService(HelloService.class);
System.out.println(service.sayHello("张三"));
}
// 单例模式,获取唯一的channel对象
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (RpcClientManager.class) {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}
// 创建代理类
public static <T> T getProxyService(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
// 将方法调用转换为 消息对象
Object proxyObj = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage message = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 发送消息
getChannel().writeAndFlush(message);
// 准备一个空 Promise 对象,接收结果 指定 promise 对象异步接收结果的线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 将 Promise 暂存起来,等待结果
RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
// 等待结果
promise.await();
if(promise.isSuccess()){
return promise.getNow();
}else{
// 抛出异常
throw new RuntimeException(promise.cause());
}
});
return (T) proxyObj;
}
// 初始化 channel 对象
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable codec = new MessageCodecSharable();
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler rpcHandler = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProcotolFrameDecoder())
.addLast(loggingHandler)
.addLast(codec)
.addLast(rpcHandler);
}
});
try {
channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error...", e);
}
}
}
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 21:41
* @Description: Rpc 请求处理器
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequestMessage message){
RpcResponseMessage resp = new RpcResponseMessage();
resp.setSequenceId(message.getSequenceId());
try {
// 获取接口实现类
HelloService service = (HelloService) ServicesFactory.getService(
Class.forName(message.getInterfaceName())
);
// 获取方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
// 执行方法
Object ret = method.invoke(service, message.getParameterValues());
// 构造 Response
resp.setReturnValue(ret);
} catch (Exception e) {
e.printStackTrace();
resp.setExceptionValue(new RuntimeException("远程过程调用:" + e.getCause().getMessage()));
}
channelHandlerContext.writeAndFlush(resp);
}
}
获取结果:
调用方法是主线程,而获取结果是另一个线程干的,想要获取结果,就需要通过 Promise 来交换(通过SeqenceId),使用一个Map一一对应
/**
* @blog: <a href="https://www.hellocode.top">HelloCode.</a>
* @Author: HelloCode.
* @CreateTime: 2023-10-31 22:03
* @Description: TODO
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
// squenceId -> Promise
// 便于获取结果
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponseMessage rpcResponseMessage) throws Exception {
log.debug("{}",rpcResponseMessage);
// 向 Promise 填充结果
// 泛型如果使用 ? 的话,只能取,不能放(null 可以),所以这里用了 Object
// 使用 remove,避免无用 Promise 占用资源
Promise<Object> promise = PROMISES.remove(rpcResponseMessage.getSequenceId());
if(promise != null){
Object value = rpcResponseMessage.getReturnValue();
Exception exception = rpcResponseMessage.getExceptionValue();
if(exception != null){
// 失败消息
promise.setFailure(exception);
}else{
// 成功消息
promise.setSuccess(value);
}
}
}
}
源码分析
推荐:https://blog.csdn.net/yyuggjggg/article/details/126634821