这篇文章将讲述如何使用 gRPC 异步/非阻塞 C++ API 编写一个简单的服务端(server)和客户端(client)。在读取这篇文章之前,你需要先了解 Protocol Buffer 和 gRPC 基础,你可在本博客搜索到它们的相关文章。 这篇文章将围绕 gRPC 的官方例子 Greeter 展开学习,你可在 grpc/examples/protos/helloworld.proto 中查看服务的定义,在 grpc/examples/cpp/helloworld/ 中查看完整代码。 概览gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下: - 在 RPC 调用上绑定一个
CompletionQueue - 做一些事情,例如读或写这样的事情,用一个唯一的
void* 标记表示 - 调用
CompletionQueue::Next 等待操作完成,如果返回之前的一个标记,则表示对应的操作已经完成。
异步客户端要使用异步客户端调用远程方法,首先要创建 channel 和 stub,这个过程和 grpc/examples/cpp/helloworld/greeter_client.cc 中差不多(同步示例)。一旦你创建了 stub,就可以做以下事情来进行异步调用了: 初始化 RPC 并为其创建句柄。将 RPC 绑定到一个 CompletionQueue 上。 CompletionQueue cq;
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));
使用一个唯一的标记(这里用的是 (void*)1 ),请求回复和最终状态 Status status;
rpc->Finish(&reply, &status, (void*)1);
等待完成队列返回标记(tag)。一旦返回了之前对应 Finish() 函数中传递的标记,应答和状态就可以被返回了。 void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// check reply and status
}
你可以在 grpc/examples/cpp/helloworld/greeter_async_client.cc 中看到完整的客户端示例。 异步服务端服务器实现一个带有标记(tag)的 RPC 调用请求,然后等待完成队列返回标记。异步处理 RPC 的基本流程是: 构建一个 server 并导出异步服务 helloworld::Greeter::AsyncService service;
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
builder.RegisterAsyncService(&service);
auto cq = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
请求一个 RPC,并提供唯一的标记(tag) ServerContext context;
HelloRequest request;
ServerAsyncResponseWriter<HelloReply> responder;
service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
等待完成队列返回标记。一旦检索到标记,context 、request 和 responder 就准备好了。 HelloReply reply;
Status status;
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// set reply and status
responder.Finish(reply, status, (void*)2);
}
等待完成队列返回标记。RPC 在标记返回时完成。 void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)2) {
// clean up
}
但是,这个基本流程没有考虑到服务端同时处理多个请求。要解决这个问题,完整的异步服务端示例使用一个CallData 对象来维护每个 RPC 的状态,并使用该对象的地址作为调用的唯一标记。 class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
responder_.Finish(reply_, Status::OK, this);
status_ = FINISH;
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
}
为简单起见,服务端对所有事件只使用一个完成队列,并在 HandleRpcs 中运行一个主循环来查询队列: void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok);
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
关闭服务端我们在使用一个完成队列来获取异步通知,在服务端被关闭之后,也必须小心地关闭它。 记住,我们在 ServerImpl::Run() 函数中通过运行 cq_ = builder.AddCompletionQueue() 来获得完成队列实例cq_ ,看看 ServerBuilder::AddCompletionQueue 的文档,我们可以看到: … Caller is required to shutdown the server prior to shutting down the returned completion queue.
有关详细信息,请参考 ServerBuilder::AddCompletionQueue 的完整文档字符串。在我们的示例中,ServerImpl 的析构函数如下所示: ~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
你可以在 grpc/examples/cpp/helloworld/greeter_async_server.cc 中看到完整的服务端示例。 实现多个服务前面官方提到的示例中只实现了一个 SayHello RPC 服务,如果想要实现多个 RPC服务该怎么办呢?下面的将讲述如何对示例中的代码进行修改,使他再支持一个名为 SayBye 的服务。 这个方法就是为每个 RPC 服务都实现一个不同的 CallData 类。但是,当你从 cq_->Next() 获取标记时,你知道它是指向这些类之一的对象的指针,但是你不知道它的确切类型。 为了克服这个问题,你可以让它们都继承一个具有 virtual Proceed() 成员函数的类,再根据需要在每个子类中实现它,当您获得一个标记时,将其转换为 CallData 并调用 Proceed() 。 class CallData {
public:
virtual void Proceed() = 0;
};
class HelloCallData final : public CallData {...};
class ByeCallData final : public CallData {...};
...
new HelloCallData(...);
new ByeCallData(...);
cq_->Next(&tag, &ok);
static_cast<CallData*>(tag)->Proceed();
...
多线程对于如何在多线程中使用异步 RPC API 完成队列,官方的的文档说明是: Right now, the best performance trade-off is having numcpu's threads and one completion queue per thread.
当前,最好的权衡性能的方法是使用创建 cpu 个数的线程数,并在每个线程中都使用一个完成队列。
|