分享

RPC 工具

 码农书馆 2018-10-09

Thrift异步模式

我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式 。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。

thrift 有提供一套异步模式模式供我们使用,我们跟往常一样来编写一个thrift 协议文件。


namespace cpp example

service Twitter {
   string sendString(1:string data);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

不同的是,我们需要加入cpp:cob_type 来生成代码。

thrift -r -strict –gen cpp:cob_style -o ./ test.thrift

生成的代码文件表和之前的基本相同,但在Twitter.cpp 和Twitter.h 文件中增加了异步客户端和异步服务器使用的类。

$ tree gen-cpp
|– Test_constants.cpp
|– Test_constants.h
|– Test_types.cpp
|– Test_types.h
|– Twitter.cpp
|– Twitter.h
|–Twitter_server.skeleton.cpp
|-Twitter_async_server.skeleton.cpp

用户只要关心在Twitter.h 中的TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor这三个类。
  • 1

Thrift 异步Client

异步客户端代码有TwitterCobClient 以及它继承的类。

class TwitterCobClient : virtual public TwitterCobClIf {
 public:
  TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
    channel_(channel),
    itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    piprot_(protocolFactory->getProtocol(itrans_)),
    poprot_(protocolFactory->getProtocol(otrans_)) {
    iprot_ = piprot_.get();
    oprot_ = poprot_.get();
  }
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
    return channel_;
  }
  virtual void completed__(bool /* success */) {}
  void sendString(tcxx::function<void(TwitterCobClient* client)> cob, const std::string& data);
  void send_sendString(const std::string& data);
  void recv_sendString(std::string& _return);
 protected:
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

从源文件上看,通过类实现发现:

  1. completed__(bool /* success */)是虚函数,用于通知用户数据接收完成;
  2. sendString函数带有回调参数 function <void(TwitterCobClient* client)> cob,用于数据接收时回调,这是异步的特点;

  3. send_sendString和recv_sendString分别用于写数据到输出缓存和从输入缓存读数据

  4. 列表内容拥有TAsyncChannel,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据;

  5. Transport采用TMemoryBuffer,TMemoryBuffer是用于程序内部之间通信用的,在这里起到读写缓存作用

下面看看关键函数 sendString的实现

void  TwitterCobClient::sendString(t cxx::function<v  oid(TwitterCobClient* client)> cob, const std::string& data)
{
  send_sendString(data);
  channel_->sendAndRecvMessage(tcxx::bind(cob, this), otrans_.get(), itrans_.get());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

send_sendString函数是想缓冲区(TMemoryBuffer)写入数据, 而sendString 则通过调用TAsyncChannel的sendAndRecvMessage接口注册回调函数。

TAsyncChannel作为接口类定义了三个接口函数。

  /**
   * Send a message over the channel.
   */
  virtual void sendMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;

  /**
   * Receive a message from the channel.
   */
  virtual void recvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;

  /**
   * Send a message over the channel and receive a response.
   */
  virtual void sendAndRecvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于libeventhttp协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。

void TEvhttpClientChannel::sendAndRecvMessage(
    const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf) {
  cob_ = cob;// 绑定回调函数
  recvBuf_ = recvBuf;

  struct evhttp_request* req = evhttp_request_new(response, this);

  uint8_t* obuf;
  uint32_t sz;
  sendBuf->getBuffer(&obuf, &sz);
  rv = evbuffer_add(req->output_buffer, obuf, sz);

  rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());// 发送http 请求

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

从sendAndRecvMessage实现可看出,TEvhttpClientChannel是用采用http协议来与服务器通信,后面介绍异步server时会发现,同样采用是http协议,它们使用的http库是libevent库的evhttp。

通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。
看下回调函数response 的实现:

/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
  TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
  try {
    self->finish(req);
  } catch (std::exception& e) {
    // don't propagate a C++ exception in C code (e.g. libevent)
    std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what()
              << std::endl;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Thrift 异步server

异步server关心另外两个类:TwitterCobSvIf和TwitterAsyncProcessor。很明显TwitterCobSvIf是要用户继承实现的,它与同步TwitterSvIf不同的地方是成员函数多一个cob回调函数,在实现TwitterSvIf时,需要调用cob。示例如下:

class TwitterCobSvNull : virtual public TwitterCobSvIf {
 public:
  virtual ~TwitterCobSvNull() {}
  void sendString(tcxx::function<void(std::string const& _return)> cob, const std::string& /* data */) {
    std::string _return;
    return cob(_return);
  }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

那么这个cob是什么函数,哪里注册的?这在thrift lib库里的TEvhttpServer和TAsyncProtocolProcessor类里可找到答案,其中TEvhttpServer是异步server,传输是采用http协议,与异步client对上。

先看看TEvhttpServer实现,同样采用event_base来异步收发数据,收到数据时,回调request函数。

void TEvhttpServer::request(struct evhttp_request* req, void* self) {
  try {
    static_cast<TEvhttpServer*>(self)->process(req);
  } catch(std::exception& e) {
    evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);
  }
}
void TEvhttpServer::process(struct evhttp_request* req) {
  RequestContext* ctx = new RequestContext(req);
  return processor_->process(     // 这里的processor_正是TAsyncProtocolProcessor
      std::tr1::bind(
        &TEvhttpServer::complete,   // 注册complete
        this,
        ctx,
        std::tr1::placeholders::_1),
      ctx->ibuf,
      ctx->obuf);
}

void TEvhttpServer::complete(RequestContext* ctx, bool success) {
  std::auto_ptr<RequestContext> ptr(ctx);

  int code = success ? 200 : 400;
  const char* reason = success ? "OK" : "Bad Request";

  int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");

  struct evbuffer* buf = evbuffer_new();
    uint8_t* obuf;
    uint32_t sz;
    ctx->obuf->getBuffer(&obuf, &sz);   // 从输出缓冲读数据
    int ret = evbuffer_add(buf, obuf, sz);

  evhttp_send_reply(ctx->req, code, reason, buf);   // 发送数据
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

接着看TAsyncProtocolProcessor的process实现

void TAsyncProtocolProcessor::process(
    std::tr1::function<void(bool healthy)> _return,
    boost::shared_ptr<TBufferBase> ibuf,
    boost::shared_ptr<TBufferBase> obuf) {
  boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
  boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
  return underlying_->process(     // underlying_是生成代码里的TwitterAsyncProcessor
      std::tr1::bind(
        &TAsyncProtocolProcessor::finish,   
        _return,  // compere函数
        oprot,
        std::tr1::placeholders::_1),
      iprot, oprot);
}

/* static */ void TAsyncProtocolProcessor::finish(
    std::tr1::function<void(bool healthy)> _return,
    boost::shared_ptr<TProtocol> oprot,
    bool healthy) {
  (void) oprot;
  // This is a stub function to hold a reference to oprot.
  return _return(healthy);  // 回调compere函数
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
最后看TwitterAsyncProcessor::process,它先写fname,mtype, seqid然后调用process_fn,process_fn选择调用合理的处理函数(如process_sendString),看process_sendString实现:
  • 1
 void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)

void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return)    =
    &TwitterAsyncProcessor::return_sendString;              // return_sendString正是我们要找的cob函数
    iface_->sendString(                      // iface_TwitterCobSvIf的具体类,用户实现的
      std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1),    // cob 是 finish函数
      args.data);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

上面return_sendString是我们要找的cob函数,该函数将用户处理的结果写入输出冲缓,并发送给client。

下面实现了一个异步客户端和异步服务端
采用异步时,必须采用http 传输层。

异步客户端的实现

demo_async_client.cc

#include <string>
#include "boost/shared_ptr.hpp"
#include <thrift/Thrift.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/server/TServer.h>
#include <thrift/async/TAsyncChannel.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "common/thrift/Twitter.h"
#include "boost/function.hpp"
#include "boost/bind.hpp"
#include <event.h>
#include <stdio.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using std::string;
using boost::shared_ptr;
using namespace example;
using namespace apache::thrift::async;


class testClient : public TwitterCobClient
{
public:
  testClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, TProtocolFactory* protocolFactory)
      : TwitterCobClient(channel, protocolFactory)
  { };

  virtual void completed__(bool success)
  {
    if (success)
    {
                printf("respone : %s \n", res.c_str());   // 输出返回结果
    }
    else
    {
      printf("failed to respone\n");
    }
    fflush(0);
  };

   string res;
};

//callback function
static void my_recv_sendString(TwitterCobClient *client){
  client->recv_sendString(dynamic_cast<testClient*>(client)->res);

}

static void sendString(testClient & client){
printf("snedstring start\n");
std::function<void(TwitterCobClient*client)>cob = bind(&my_recv_sendString,_1);
client.sendString(cob,"Hello");
printf("sendstring end\n");

}

static void DoSimpleTest(const std::string & host, int port){
 printf("running SimpleTset(%s, %d)..\n", host.c_str(),port);
 event_base* base = event_base_new();
   boost::shared_ptr< ::apache::thrift::async::TAsyncChannel>  channel1( new TEvhttpClientChannel( host, "/", host.c_str(), port, base  ) );

  testClient client1( channel1,  new TBinaryProtocolFactory() );

  sendString(client1);   // 发送第一个请求

  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel>  channel2( new TEvhttpClientChannel( host, "/", host.c_str(), port, base  ) );

  testClient client2( channel2,  new TBinaryProtocolFactory() );

  sendString(client2);  // 发送第二个请求

  event_base_dispatch(base);

  event_base_free(base);

  printf( "done DoSimpleTest().\n" );
}

int main( int argc, char* argv[] )
{
  DoSimpleTest( "localhost", 14488 );
  return 0;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

异步服务端的实现

demo_async_serv.cc

#include <string>
#include "boost/shared_ptr.hpp"
#include <thrift/Thrift.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/server/TServer.h>
#include <thrift/async/TAsyncChannel.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "common/thrift/Twitter.h"
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/async/TEvhttpServer.h>





using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using std::string;
using namespace boost;
using namespace example;
using namespace apache::thrift::async;

class TwitterAsyncHandler : public TwitterCobSvIf {
 public:
  TwitterAsyncHandler() {
    // Your initialization goes here
  }

  void sendString(std::function<void(std::string const& _return)> cob, const std::string& data) {
    printf("sendString rec:%s\n", data.c_str());  // 输出收到的数据
    std::string _return = "world";   // 返回world字符串给客户端
    return cob(_return);
  }

};

int main(int argc, char **argv) {
  shared_ptr<TAsyncProcessor> underlying_pro(new TwitterAsyncProcessor( shared_ptr<TwitterCobSvIf>(new TwitterAsyncHandler()) ) );
  shared_ptr<TAsyncBufferProcessor> processor( new TAsyncProtocolProcessor( underlying_pro, shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()) ) );

  TEvhttpServer server(processor, 14488);
  server.serve();
  return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

参考

http://blog.csdn.net/whycold/article/details/10973
http://tech.uc.cn/?p=2668553

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多