概述
ChannelHandler
ChannelHandler 是 Netty 中的一个接口,它定义了处理 I/O 事件的方法。ChannelHandler 可以处理各种类型的事件,包括连接事件、读写事件、异常事件等。
方法
入站(Inbound)方法:
- channelRead(ChannelHandlerContext ctx, Object msg):当从通道接收到数据时调用。
- channelReadComplete(ChannelHandlerContext ctx):当所有数据都已读取完毕时调用。
- exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当发生异常时调用。
出站(Outbound)方法:
- write(ChannelHandlerContext ctx, Object msg, Promise promise):当有数据需要写出时调用。
- flush(ChannelHandlerContext ctx):当需要刷新写出缓冲区时调用。
ChannelPipeline
ChannelPipeline 是 Netty 中的核心组件之一,它管理了一系列的 ChannelHandler。ChannelPipeline 可以理解为一个责任链模式的实现,它按照顺序处理事件,每个 ChannelHandler 负责处理特定类型的事件,并可以选择将事件传递给下一个 ChannelHandler 或者停止处理。
特点
- 有序性:ChannelPipeline 中的 ChannelHandler 是按顺序排列的,可以插入、删除或替换。
- 双向性:ChannelPipeline 支持双向处理,即可以处理入站(Inbound)事件,也可以处理出站(Outbound)事件。
- 灵活性:可以动态地添加、删除或替换 ChannelHandler,而无需重新启动应用程序。
总结
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
代码示例
服务器端
在服务器端中添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class PipelineTest {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
//addLast()会加在tail之前,而不是最后。底层是双向链表
//入站处理器
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
//处理加工msg
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
//将处理好的字符串传递给h2
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
Student student = new Student(name.toString());
super.channelRead(ctx, student);
// 调用super.channelRead()将数据传递给下个 handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);
}
});
//出战处理器。注意:ctx.writeAndFlush()和ch.writeAndFlush()都会走该出站处理器
pipeline.addLast("h2.5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("2.5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3,h2传递的数据为{},class为{}",msg,msg.getClass());
//super.channelRead(ctx, msg);//channelRead()是将控制权交给pipeline中下一个入站处理器
//这里h3后面已经没有入站处理器了,所以可以不用调用channelRead()
//向channel中写入数据触发出站处理器(ch.writeAndFlush()会从尾节点向前找出站处理器)
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
//ctx.writeAndFlush()是从当前处理器向前寻找出站处理器
// ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
//出站处理器 p2.25
//注意:出站处理器只有向channel中写入数据才会触发
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
private String name;
}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author qf
* @since 2024/09/12 19:44
*/
public class Client {
public static void main(String[] args) {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.addListener((ChannelFutureListener) future -> {
future.channel().writeAndFlush("hello,world");
});
}
}
服务器端输出
18:45:18.287 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 1
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 3,h2传递的数据为PipelineTest.Student(name=hello,world),class为class com.qf.netty.Pipeline.PipelineTest$Student
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 6
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 5
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 4
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2.5
以上代码可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
-
入站处理器中,super.channelRead(ctx, name)或使用ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 h1中 super.channelRead(ctx, name)代码,则仅会打印 1
- 如果注释掉 h2中 super.channelRead(ctx, student)代码,则仅会打印 1 2
-
h3 处的 ch.writeAndFlush() 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 h3 处 ch.writeAndFlush() 代码,则仅会打印 1 2 3
-
类似的,出站处理器中,super.write(ctx, msg, promise)或ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 h6 处 super.write(ctx, msg, promise) 代码,则仅会打印 1 2 3 6
-
ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己(死循环)
服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
EmbeddedChannel
EmbeddedChannel 是 netty提供的专门用来测试的channel。
使用EmbeddedChannel进行测试可以不启动服务器和客户端了。
EmbeddedChannel 的方法
- writeInbound(Object msg):将入站消息写到 EmbeddedChannel 中。
- writeOutbound(Object msg):将出站消息写到 EmbeddedChannel 中。
- readInbound():从 EmbeddedChannel 中读取入站消息。
- readOutbound():从 EmbeddedChannel 中读取出站消息。
- finish():完成所有未完成的写操作,并关闭 EmbeddedChannel
@Slf4j
public class Test06EmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
/*
[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 1
[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 2
*/
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
/*
[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 4
[DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 3
*/
}
}
转载自CSDN-专业IT技术社区
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_46425661/article/details/142177575