11.netty入门(十一)任务队列 TaskQueue
11.netty入门(十一)任务队列 TaskQueue
1.任务队列中的Task有3种典型使用场景
- 用户程序自定义的普通任务;
- 用户自定义定时任务;
- 非当前Reactor线程调用Channel的各种方法;
例如,在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费。
2.【用户程序自定义的普通任务】代码示例
2.1问题分析
首先我们 在 NettyServerHandler 的 channelRead() 方法中模拟一下耗时的业务:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 读取数据实现(这里我们可以读取客户端发送的消息)
/**
* 1. ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道channel,地址
* 2. Object msg:就是客户端发送的数据 默认Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
System.out.println("go on........");
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush 是 write + flush
// 将数据写入到缓存,并刷新
// 一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
}
// 处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
此时客户端必然是等待 10秒 后才能收到消息:hello,客户端~ 喵2;然后再收到消息:hello,客户端~。
2.2解决方案一:用户程序自定义的普通任务
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中
// 解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常 " + e.getMessage());
e.printStackTrace();
}
}
});
System.out.println("go on........");
}
这是我们启动程序发现:首先收到消息:hello,客户端...;然后过 5秒 收到消息:hello,客户端~ 喵2
验证也很简单,Debug查看:ctx --> pipeline --> channel --> eventLoop --> taskQueue,看一下 taskQueue 的size就行了。
2.3解决方案二:用户自定义定时任务
该任务是提交到 scheduleTaskQueue中
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 比如这里有一个非常耗费时间的业务 -> 异步任务 -> 提交到该channel对应的 NIOEventLoop 的 taskQueue中
// 解决方案2 用户自定义的定时任务 -> 该任务是提交到 scheduleTaskQueue 中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("发生异常 " + e.getMessage());
e.printStackTrace();
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on........");
}
验证也很简单,Debug查看:ctx --> pipline --> channel --> eventLoop --> scheduleTaskQueue,看一下 scheduleTaskQueue 的 size 就行了。
方案在说明
- Netty抽象出两组线程池,BoosGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop都有一个selector用于监听绑定在其上的socket网络通道。
- NioEventLoop内部采用串行化设计,从消息的读取 -> 解码 -> 处理 -> 编码 -> 发送,始终由IO线程 NioEventLoop负责。
- NioEventLoopGroup 下包含多个NioEventLoop
- 每个NioEventLoop中包含有一个Selector,一个taskQueue
- 每个NioEventLoop的Selector上可以注册监听多个NioChannel
- 每个NioChannel只会绑定在唯一的NioEventLoop上
- 每个NioChannel都绑定有一个自己的ChannelPipeline
来源:https://blog.csdn.net/zhufei463738313/article/details/129254893