案例介绍 在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。
这里我们选择netty作为我们的socket框架,采用future方式进行通信。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备 1、jdk 1.8.0 2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
itstack- demo- rpc- 02
└── src
└── main
│ └── java
│ └── org. itstack. demo. rpc. network
│ ├── client
│ │ ├── ClientSocket. java
│ │ └── MyClientHandler. java
│ ├── codec
│ │ ├── RpcDecoder. java
│ │ └── RpcEncoder. java
│ ├── future
│ │ ├── SyncWrite. java
│ │ ├── SyncWriteFuture. java
│ │ ├── SyncWriteMap. java
│ │ └── WriteFuture. java
│ ├── msg
│ │ ├── Request. java
│ │ └── Response. java
│ ├── server
│ │ ├── MyServerHandler. java
│ │ └── ServerSocket. java
│ └── util
│ └── SerializationUtil. java
└── test
└── java
└── org. itstack. demo. test
├── client
│ └── StartClient. java
└── server
└── StartServer. java
ClientSocket.java
package org. itstack. demo. rpc. network. client;
import io. netty. bootstrap. Bootstrap;
import io. netty. channel. ChannelFuture;
import io. netty. channel. ChannelInitializer;
import io. netty. channel. ChannelOption;
import io. netty. channel. EventLoopGroup;
import io. netty. channel. nio. NioEventLoopGroup;
import io. netty. channel. socket. SocketChannel;
import io. netty. channel. socket. nio. NioSocketChannel;
import org. itstack. demo. rpc. network. codec. RpcDecoder;
import org. itstack. demo. rpc. network. codec. RpcEncoder;
import org. itstack. demo. rpc. network. msg. Request;
import org. itstack. demo. rpc. network. msg. Response;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class ClientSocket implements Runnable {
private ChannelFuture future;
@Override
public void run ( ) {
EventLoopGroup workerGroup = new NioEventLoopGroup ( ) ;
try {
Bootstrap b = new Bootstrap ( ) ;
b. group ( workerGroup) ;
b. channel ( NioSocketChannel. class ) ;
b. option ( ChannelOption. AUTO_READ, true ) ;
b. handler ( new ChannelInitializer < SocketChannel> ( ) {
@Override
public void initChannel ( SocketChannel ch) throws Exception {
ch. pipeline ( ) . addLast (
new RpcDecoder ( Response. class ) ,
new RpcEncoder ( Request. class ) ,
new MyClientHandler ( ) ) ;
}
} ) ;
ChannelFuture f = b. connect ( "127.0.0.1" , 7397 ) . sync ( ) ;
this . future = f;
f. channel ( ) . closeFuture ( ) . sync ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
} finally {
workerGroup. shutdownGracefully ( ) ;
}
}
public ChannelFuture getFuture ( ) {
return future;
}
public void setFuture ( ChannelFuture future) {
this . future = future;
}
}
MyClientHandler.java
package org. itstack. demo. rpc. network. client;
import io. netty. channel. ChannelHandlerContext;
import io. netty. channel. ChannelInboundHandlerAdapter;
import org. itstack. demo. rpc. network. future. SyncWriteFuture;
import org. itstack. demo. rpc. network. future. SyncWriteMap;
import org. itstack. demo. rpc. network. msg. Response;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead ( ChannelHandlerContext ctx, Object obj) throws Exception {
Response msg = ( Response) obj;
String requestId = msg. getRequestId ( ) ;
SyncWriteFuture future = ( SyncWriteFuture) SyncWriteMap. syncKey. get ( requestId) ;
if ( future != null) {
future. setResponse ( msg) ;
}
}
@Override
public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) {
cause. printStackTrace ( ) ;
ctx. close ( ) ;
}
}
RpcDecoder.java
package org. itstack. demo. rpc. network. codec;
import io. netty. buffer. ByteBuf;
import io. netty. channel. ChannelHandlerContext;
import io. netty. handler. codec. ByteToMessageDecoder;
import org. itstack. demo. rpc. network. util. SerializationUtil;
import java. util. List;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class RpcDecoder extends ByteToMessageDecoder {
private Class< ? > genericClass;
public RpcDecoder ( Class< ? > genericClass) {
this . genericClass = genericClass;
}
@Override
protected void decode ( ChannelHandlerContext ctx, ByteBuf in, List< Object> out) {
if ( in. readableBytes ( ) < 4 ) {
return ;
}
in. markReaderIndex ( ) ;
int dataLength = in. readInt ( ) ;
if ( in. readableBytes ( ) < dataLength) {
in. resetReaderIndex ( ) ;
return ;
}
byte [ ] data = new byte [ dataLength] ;
in. readBytes ( data) ;
out. add ( SerializationUtil. deserialize ( data, genericClass) ) ;
}
}
RpcEncoder.java
package org. itstack. demo. rpc. network. codec;
import io. netty. buffer. ByteBuf;
import io. netty. channel. ChannelHandlerContext;
import io. netty. handler. codec. MessageToByteEncoder;
import org. itstack. demo. rpc. network. util. SerializationUtil;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class RpcEncoder extends MessageToByteEncoder {
private Class< ? > genericClass;
public RpcEncoder ( Class< ? > genericClass) {
this . genericClass = genericClass;
}
@Override
protected void encode ( ChannelHandlerContext ctx, Object in, ByteBuf out) {
if ( genericClass. isInstance ( in) ) {
byte [ ] data = SerializationUtil. serialize ( in) ;
out. writeInt ( data. length) ;
out. writeBytes ( data) ;
}
}
}
SyncWrite.java
package org. itstack. demo. rpc. network. future;
import io. netty. channel. Channel;
import io. netty. channel. ChannelFuture;
import io. netty. channel. ChannelFutureListener;
import org. itstack. demo. rpc. network. msg. Request;
import org. itstack. demo. rpc. network. msg. Response;
import java. util. UUID;
import java. util. concurrent. TimeUnit;
import java. util. concurrent. TimeoutException;
public class SyncWrite {
public Response writeAndSync ( final Channel channel, final Request request, final long timeout) throws Exception {
if ( channel == null) {
throw new NullPointerException ( "channel" ) ;
}
if ( request == null) {
throw new NullPointerException ( "request" ) ;
}
if ( timeout <= 0 ) {
throw new IllegalArgumentException ( "timeout <= 0" ) ;
}
String requestId = UUID. randomUUID ( ) . toString ( ) ;
request. setRequestId ( requestId) ;
WriteFuture< Response> future = new SyncWriteFuture ( request. getRequestId ( ) ) ;
SyncWriteMap. syncKey. put ( request. getRequestId ( ) , future) ;
Response response = doWriteAndSync ( channel, request, timeout, future) ;
SyncWriteMap. syncKey. remove ( request. getRequestId ( ) ) ;
return response;
}
private Response doWriteAndSync ( final Channel channel, final Request request, final long timeout, final WriteFuture< Response> writeFuture) throws Exception {
channel. writeAndFlush ( request) . addListener ( new ChannelFutureListener ( ) {
public void operationComplete ( ChannelFuture future) throws Exception {
writeFuture. setWriteResult ( future. isSuccess ( ) ) ;
writeFuture. setCause ( future. cause ( ) ) ;
//失败移除
if ( ! writeFuture. isWriteSuccess ( ) ) {
SyncWriteMap. syncKey. remove ( writeFuture. requestId ( ) ) ;
}
}
} ) ;
Response response = writeFuture. get ( timeout, TimeUnit. MILLISECONDS) ;
if ( response == null) {
if ( writeFuture. isTimeout ( ) ) {
throw new TimeoutException ( ) ;
} else {
// write exception
throw new Exception ( writeFuture. cause ( ) ) ;
}
}
return response;
}
}
SyncWriteFuture.java
package org. itstack. demo. rpc. network. future;
import org. itstack. demo. rpc. network. msg. Response;
import java. util. concurrent. CountDownLatch;
import java. util. concurrent. ExecutionException;
import java. util. concurrent. TimeUnit;
import java. util. concurrent. TimeoutException;
public class SyncWriteFuture implements WriteFuture < Response> {
private CountDownLatch latch = new CountDownLatch ( 1 ) ;
private final long begin = System. currentTimeMillis ( ) ;
private long timeout;
private Response response;
private final String requestId;
private boolean writeResult;
private Throwable cause;
private boolean isTimeout = false ;
public SyncWriteFuture ( String requestId) {
this . requestId = requestId;
}
public SyncWriteFuture ( String requestId, long timeout) {
this . requestId = requestId;
this . timeout = timeout;
writeResult = true ;
isTimeout = false ;
}
public Throwable cause ( ) {
return cause;
}
public void setCause ( Throwable cause) {
this . cause = cause;
}
public boolean isWriteSuccess ( ) {
return writeResult;
}
public void setWriteResult ( boolean result) {
this . writeResult = result;
}
public String requestId ( ) {
return requestId;
}
public Response response ( ) {
return response;
}
public void setResponse ( Response response) {
this . response = response;
latch. countDown ( ) ;
}
public boolean cancel ( boolean mayInterruptIfRunning) {
return true ;
}
public boolean isCancelled ( ) {
return false ;
}
public boolean isDone ( ) {
return false ;
}
public Response get ( ) throws InterruptedException, ExecutionException {
latch. wait ( ) ;
return response;
}
public Response get ( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if ( latch. await ( timeout, unit) ) {
return response;
}
return null;
}
public boolean isTimeout ( ) {
if ( isTimeout) {
return isTimeout;
}
return System. currentTimeMillis ( ) - begin > timeout;
}
}
SyncWriteMap.java
package org. itstack. demo. rpc. network. future;
import java. util. Map;
import java. util. concurrent. ConcurrentHashMap;
public class SyncWriteMap {
public static Map< String, WriteFuture> syncKey = new ConcurrentHashMap < String, WriteFuture> ( ) ;
}
WriteFuture.java
package org. itstack. demo. rpc. network. future;
import org. itstack. demo. rpc. network. msg. Response;
import java. util. concurrent. Future;
public interface WriteFuture < T> extends Future < T> {
Throwable cause ( ) ;
void setCause ( Throwable cause) ;
boolean isWriteSuccess ( ) ;
void setWriteResult ( boolean result) ;
String requestId ( ) ;
T response ( ) ;
void setResponse ( Response response) ;
boolean isTimeout ( ) ;
}
Request.java
package org. itstack. demo. rpc. network. msg;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class Request {
private String requestId;
private Object result;
public String getRequestId ( ) {
return requestId;
}
public void setRequestId ( String requestId) {
this . requestId = requestId;
}
public Object getResult ( ) {
return result;
}
public void setResult ( Object result) {
this . result = result;
}
}
Response.java
package org. itstack. demo. rpc. network. msg;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class Response {
private String requestId;
private String param;
public String getRequestId ( ) {
return requestId;
}
public void setRequestId ( String requestId) {
this . requestId = requestId;
}
public String getParam ( ) {
return param;
}
public void setParam ( String param) {
this . param = param;
}
}
MyServerHandler.java
package org. itstack. demo. rpc. network. server;
import io. netty. channel. ChannelHandlerContext;
import io. netty. channel. ChannelInboundHandlerAdapter;
import io. netty. util. ReferenceCountUtil;
import org. itstack. demo. rpc. network. msg. Request;
import org. itstack. demo. rpc. network. msg. Response;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead ( ChannelHandlerContext ctx, Object obj) {
Request msg = ( Request) obj;
//反馈
Response request = new Response ( ) ;
request. setRequestId ( msg. getRequestId ( ) ) ;
request. setParam ( msg. getResult ( ) + " 请求成功,反馈结果请接受处理。" ) ;
ctx. writeAndFlush ( request) ;
//释放
ReferenceCountUtil. release ( msg) ;
}
@Override
public void channelReadComplete ( ChannelHandlerContext ctx) {
ctx. flush ( ) ;
}
}
ServerSocket.java
package org. itstack. demo. rpc. network. server;
import io. netty. bootstrap. ServerBootstrap;
import io. netty. channel. ChannelFuture;
import io. netty. channel. ChannelInitializer;
import io. netty. channel. ChannelOption;
import io. netty. channel. EventLoopGroup;
import io. netty. channel. nio. NioEventLoopGroup;
import io. netty. channel. socket. SocketChannel;
import io. netty. channel. socket. nio. NioServerSocketChannel;
import org. itstack. demo. rpc. network. codec. RpcDecoder;
import org. itstack. demo. rpc. network. codec. RpcEncoder;
import org. itstack. demo. rpc. network. msg. Request;
import org. itstack. demo. rpc. network. msg. Response;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class ServerSocket implements Runnable {
private ChannelFuture f;
@Override
public void run ( ) {
EventLoopGroup bossGroup = new NioEventLoopGroup ( ) ;
EventLoopGroup workerGroup = new NioEventLoopGroup ( ) ;
try {
ServerBootstrap b = new ServerBootstrap ( ) ;
b. group ( bossGroup, workerGroup)
. channel ( NioServerSocketChannel. class )
. option ( ChannelOption. SO_BACKLOG, 128 )
. childHandler ( new ChannelInitializer < SocketChannel> ( ) {
@Override
public void initChannel ( SocketChannel ch) {
ch. pipeline ( ) . addLast (
new RpcDecoder ( Request. class ) ,
new RpcEncoder ( Response. class ) ,
new MyServerHandler ( ) ) ;
}
} ) ;
ChannelFuture f = null;
f = b. bind ( 7397 ) . sync ( ) ;
f. channel ( ) . closeFuture ( ) . sync ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
} finally {
workerGroup. shutdownGracefully ( ) ;
bossGroup. shutdownGracefully ( ) ;
}
}
}
SerializationUtil.java
package org. itstack. demo. rpc. network. util;
import com. dyuproject. protostuff. LinkedBuffer;
import com. dyuproject. protostuff. ProtostuffIOUtil;
import com. dyuproject. protostuff. Schema;
import com. dyuproject. protostuff. runtime. RuntimeSchema;
import org. objenesis. Objenesis;
import org. objenesis. ObjenesisStd;
import java. util. Map;
import java. util. concurrent. ConcurrentHashMap;
/**
* Created by fuzhengwei1 on 2016/10/20.
*/
public class SerializationUtil {
private static Map< Class< ? > , Schema< ? >> cachedSchema = new ConcurrentHashMap ( ) ;
private static Objenesis objenesis = new ObjenesisStd ( ) ;
private SerializationUtil ( ) {
}
/**
* 序列化(对象 -> 字节数组)
*
* @param obj 对象
* @return 字节数组
*/
public static < T> byte [ ] serialize ( T obj) {
Class< T> cls = ( Class< T> ) obj. getClass ( ) ;
LinkedBuffer buffer = LinkedBuffer. allocate ( LinkedBuffer. DEFAULT_BUFFER_SIZE) ;
try {
Schema< T> schema = getSchema ( cls) ;
return ProtostuffIOUtil. toByteArray ( obj, schema, buffer) ;
} catch ( Exception e) {
throw new IllegalStateException ( e. getMessage ( ) , e) ;
} finally {
buffer. clear ( ) ;
}
}
/**
* 反序列化(字节数组 -> 对象)
*
* @param data
* @param cls
* @param <T>
*/
public static < T> T deserialize ( byte [ ] data, Class< T> cls) {
try {
T message = objenesis. newInstance ( cls) ;
Schema< T> schema = getSchema ( cls) ;
ProtostuffIOUtil. mergeFrom ( data, message, schema) ;
return message;
} catch ( Exception e) {
throw new IllegalStateException ( e. getMessage ( ) , e) ;
}
}
private static < T> Schema< T> getSchema ( Class< T> cls) {
Schema< T> schema = ( Schema< T> ) cachedSchema. get ( cls) ;
if ( schema == null) {
schema = RuntimeSchema. createFrom ( cls) ;
cachedSchema. put ( cls, schema) ;
}
return schema;
}
}
StartClient.java
package org. itstack. demo. test. client;
import com. alibaba. fastjson. JSON;
import io. netty. channel. ChannelFuture;
import org. itstack. demo. rpc. network. client. ClientSocket;
import org. itstack. demo. rpc. network. future. SyncWrite;
import org. itstack. demo. rpc. network. msg. Request;
import org. itstack. demo. rpc. network. msg. Response;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class StartClient {
private static ChannelFuture future;
public static void main ( String[ ] args) {
ClientSocket client = new ClientSocket ( ) ;
new Thread ( client) . start ( ) ;
while ( true ) {
try {
//获取future,线程有等待处理时间
if ( null == future) {
future = client. getFuture ( ) ;
Thread. sleep ( 500 ) ;
continue ;
}
//构建发送参数
Request request = new Request ( ) ;
request. setResult ( "查询用户信息" ) ;
SyncWrite s = new SyncWrite ( ) ;
Response response = s. writeAndSync ( future. channel ( ) , request, 1000 ) ;
System. out. println ( "调用结果:" + JSON. toJSON ( response) ) ;
Thread. sleep ( 1000 ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
}
}
StartServer.java
package org. itstack. demo. test. server;
import org. itstack. demo. rpc. network. server. ServerSocket;
/**
* http://www.
* create by fuzhengwei on 2019/5/6
*/
public class StartServer {
public static void main ( String[ ] args) {
System. out. println ( "启动服务端开始" ) ;
new Thread ( new ServerSocket ( ) ) . start ( ) ;
System. out. println ( "启动服务端完成" ) ;
}
}
测试结果
启动StartServer
启动服务端开始
启动服务端完成
log4j: WARN No appenders could be found for logger ( io. netty. util. internal. logging. InternalLoggerFactory) .
log4j: WARN Please initialize the log4j system properly.
log4j: WARN See http: / / logging. apache. org/ log4j/ 1.2 / faq. html#noconfig for more info.
启动StartClient
log4j: WARN No appenders could be found for logger ( io. netty. util. internal. logging. InternalLoggerFactory) .
log4j: WARN Please initialize the log4j system properly.
log4j: WARN See http: / / logging. apache. org/ log4j/ 1.2 / faq. html#noconfig for more info.
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "3380f061-2501-49b5-998b-21b5956fe60a" }
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "81c51815-4d92-482c-bd05-e4b6dfa4d3b6" }
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7" }
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "86e38bb1-eccc-4d45-b976-c3b67999e3ab" }
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "7f72002c-3b38-43d9-8452-db8797298899" }
调用结果:{ "param" : "查询用户信息 请求成功,反馈结果请接受处理。" , "requestId" : "d566a7d4-4b0d-426b-8c09-c535ccf8eb09" }
. . .
关注公众号,回复源码 ,获取java代码