一、异步模型同步I/O : 需要进程去真正的去操作I/O; 异步I/O:内核在I/O操作完成后再通知应用进程操作结果。 怎么去理解同步和异步?
比如上一篇关于 二、异步模型存在的问题使用流程
创建一个处理器 public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {try { AttributeKey<RpcResponse> key = AttributeKey.valueOf(msg.getRequestId()); ctx.channel().attr(key).set(msg); ctx.channel().close(); } finally { ReferenceCountUtil.release(msg); } } } 记得将该 处理器 加入到 客户端 创建服务器和客户端,这里不再赘述,这篇文章对刚入门的帮助不大,可到文章最后取经拿服务端和客户端。
客户端这里 @Overridepublic Object sendRequest(RpcRequest rpcRequest) throws RpcException {// 通过 host 和 port 获取 channel//省略// 写入 channel 让 服务端 去 读 requestchannel.writeAndFlush(rpcRequest);// 获取 k-v 对RpcResponse rpcResponse = channel.attr(key).get(); } 相信你们当中有一部分发觉了异样, 最后测试到,客户端拿不到值,总是为 那怎么保持使用异步操作,并且可以顺利拿到值呢? 那么就得通过 三、使用CompletableFuture 解决异步问题
CompletableFuture<RpcResponse> resultFuture = new CompletableFuture<>();/**complete 执行结束后,状态发生改变,则 说明 值已经传到了,complete 是 (被观察者) 通知类的通知方法,通知 观察者 ,get 方法将 不再阻塞,可以获取到值 */resultFuture .complete(msg);/**获取 正确结果,get 是阻塞操作,所以 先把 resultFuture 作为 返回值 返回,再 get 获取值 */RpcResponse rpcResponse = resultFuture.get();// 获取 错误结果, 抛 异常 处理resultFuture.completeExceptionally(future.cause()); 所以我们要做的就是在 简单来说就是通过使用这个 需要注意的是,在 客户端的 public class SingleFactory {private static Map<Class, Object> objectMap = new HashMap<>();private SingleFactory() {}/** * 使用 双重 校验锁 实现 单例模式 * @param clazz * @param <T> * @return*/public static <T> T getInstance(Class<T> clazz) {Object instance = objectMap.get(clazz);if (instance == null) {synchronized (clazz) {if (instance == null) {try { instance = clazz.newInstance(); } catch (InstantiationException | IllegalAccessException e) {throw new RuntimeException(e.getMessage(), e); } } } }return clazz.cast(instance); } } 下面这样实现是因为涉及到多个客户端并发访问同一个服务器,设计的原因如下:
public class UnprocessedRequests {/** * k - request id * v - 可将来获取 的 response */private static ConcurrentMap<String, CompletableFuture<RpcResponse>> unprocessedResponseFutures = new ConcurrentHashMap<>();/** * @param requestId 请求体的 requestId 字段 * @param future 经过 CompletableFuture 包装过的 响应体 */public void put(String requestId, CompletableFuture<RpcResponse> future) { System.out.println("put" + future); unprocessedResponseFutures.put(requestId, future); }/** * 移除 CompletableFuture<RpcResponse> * @param requestId 请求体的 requestId 字段 */public void remove(String requestId) { unprocessedResponseFutures.remove(requestId); }public void complete(RpcResponse rpcResponse) { CompletableFuture<RpcResponse> completableFuture = unprocessedResponseFutures.remove(rpcResponse.getRequestId()); completableFuture.complete(rpcResponse); System.out.println("remove" + completableFuture); } } 传送门: 设计模式:https:///fyphome/git-res/tree/master/design-patterns |
|