原文地址:http:///blog/2016/05/01/finagle2/
在上篇文章中我介绍了Finagle中的Future/Service/Filter. 这篇文章里, 我们将构建一个基于Http协议的echo服务端和客户端, 下篇文章将构建一个基于thrift协议的客户端和服务端. 这两篇文章对应的源代码地址在Github.
代码中有Java和Scala版本两套版本的实现, 但是这里我只会介绍Java版本.
首先来看echo应用的Server端代码, 打开java-finagle-example/src/main/java/com/akkafun/finagle/Server.java :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class Server extends Service<Request, Response> { //1
@Override
public Future<Response> apply(Request request) { //2
System.out.println("request: " + request.getContentString());
Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok());
response.setContentString(request.getContentString());
return Future.value(response);
}
public static void main(String[] args) throws Exception {
Server service = new Server();
ListeningServer server = Http.server(). //3
withLabel("echo-server").
withTracer(ZipkinTracer.mk("192.168.99.100",
9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
serve(new InetSocketAddress(8081), service);
Await.result(server);
}
}
|
-
在Finagle中, 实现一个RPC服务非常简单. 只需要继承Service抽象类, 实现它的apply方法. Service抽象类有两个类型参数, 第一个类型参数代表的是请求对象, 第二个类型参数代表的是返回对象. 这两个对象的具体类型与Service实现类使用的具体协议有关. 例如我们在echo服务中使用Http协议, 对应的Request类就是com.twitter.finagle.http.Request ,
对应的Response类是com.twitter.finagle.http.Response .
如果是thrift协议, 则这两个类型参数在Service实现类中都是scala.Array<scala.Byte> (Array和Byte都是scala中的类,
对应Java中的数组与byte).
-
apply方法中, 我们首先使用Response的工厂方法构造一个Response对象. 然后将Request中的请求内容原封不动的设置到Response中, 再将Response设置到Future中返回. 需要最后一步的原因是apply方法的返回值类型是Future<Response> ,
但是我们在这个方法中不需要进行异步操作, 所以可以直接使用Future.value(response) 将对象包装成Future返回.
另外, 细心的你应该发现了一行比较碍眼的代码: Response.apply(Version.Http11$.MODULE$,
Status.Ok()) , 其中Version的用法很古怪. 这是Java调用Scala伴生对象的副作用, Scala有一些语法和特性在Java中没有对应的概念, 这种情况下Java调用Scala的代码就会比较晦涩.
-
为了启动Service实例, 我们需要构造一个com.twitter.finagle.ListeningServer . withLabel 设置服务名称, withTracer 设置监控信息,
这个等后面介绍zipkin的时候在解释. 最后指定端口启动服务.
现在来看echo应用的Client端代码, 打开java-finagle-example/src/main/java/com/akkafun/finagle/Client.java :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import static scala.compat.java8.JFunction.*;
public class Client {
public static void main(String[] args) throws TimeoutException, InterruptedException {
Service<Request, Response> service = Http.client(). //1
withLabel("echo-client").
withTracer(ZipkinTracer.mk("192.168.99.100",
9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
newService("127.0.0.1:8081");
//create a "Greetings!" request.
Reader data = Reader$.MODULE$.fromStream( //2
new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8)));
Request request = Request.apply(Version.Http11$.MODULE$,
Method.Post$.MODULE$, "/", data);
Future<Response> responseFuture = Await.ready(service.apply(request)); //3
responseFuture.onSuccess(func(response -> { //4
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
}));
responseFuture.onFailure(func(e -> {
System.out.println("error: " + e.toString());
return BoxedUnit.UNIT;
}));
responseFuture.ensure(func(() -> {
service.close();
//IDE may complain here, just ignore
return BoxedUnit.UNIT;
}));
}
}
|
-
这部分代码和我们之前的Server类代码很像. 在Server类中, 我们创建了一个Service实例并监听了8081端口, 现在客户端通过newService创建了一个Service的stub.
-
这部分代码用来构造一个消息内容为Greetings的Http请求.
-
service.apply(request) 就是一次客户端到服务端的RPC调用.
这个调用的返回值是Future<Response> .
而service.apply(request) 是一个异步操作,
主线程调用这个方法并不会阻塞, 有可能主线程退出了实际调用还没有完成. 所以这里就要用到Await.ready 了. Await.ready 的作用是等待一个Future执行完成再返回,
是一个同步操作. 通过调用Await.ready 我们就能将一个异步操作转化成一个同步操作.
-
接下来我们在Future上注册请求成功与失败的回调函数. 请求成功的回调函数中只是简单的打印出响应的消息内容.
这里有个细节需要说明一下. Future的onSuccess方法需要传入一个Scala的函数特质: scala.Function1[Response,
BoxedUnit] . 如果是Java6或7, 我们可以这样实现这个特质:
1
2
3
4
5
6
7
8
|
responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){
@Override
public BoxedUnit apply(Response response) {
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
}
});
|
在Java8中, 这种匿名类我们一般会使用Lambda代替, 理想情况下写法是这样:
1
2
3
4
5
|
responseFuture.onSuccess(response -> {
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
});
|
可惜的是这种写法编译不会通过, 因为只有符合FunctionalInterface 定义的接口才能使用Lambda表达式(什么是FunctionalInterface ,
请参考Javadoc),
而在Scala2.11中, scala.Function1 不是一个FunctionalInterface (Scala2.12会兼容Java8).
为了在这里使用Lambda, 我们使用了scala-java8-compat这个库,
调用scala.compat.java8.JFunction.func 方法将一个FunctionalInterface 转化成scala.Function1 .
可以看出, 在Java中调用Finagle的API不是很方便. 所以Finagle适合以Scala为主, Java为辅的项目. 如果项目全是Java, 则值得为Finagle主要的API写一层Java的适配层, 来屏蔽Java调用Scala代码会出现的一些晦涩代码.
现在我们启动服务端和客户端来看看运行结果. 首先启动Server类, 然后启动Client. Client运行完毕自动结束, 你应该能在Client的控制台看到如下输出:
1
|
response status: Status(200), response string: Greetings!
|
Server控制台的输出:
Http协议比较适合用于对外提供服务, 并且一般会使用REST. 在Finagle中使用REST可以使用Finch库.
这个库轻量小巧, API简单, 提供了一套很方便的对Http消息进行操作的DSL. 如果是内网服务调用, 一般推荐使用结构紧凑, 传输效率高的协议. 比如protocol buffer, thrift或Avro. Finagle对thrift有很好的支持, 下篇文章我将介绍在Finagle中如何开发thrift应用.
|