rpc_connection.cc 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "rpc_engine.h"
  19. #include "RpcHeader.pb.h"
  20. #include "ProtobufRpcEngine.pb.h"
  21. #include "IpcConnectionContext.pb.h"
  22. #include "common/util.h"
  23. #include <asio/read.hpp>
  24. #include <google/protobuf/io/coded_stream.h>
  25. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  26. namespace hdfs {
  27. namespace pb = ::google::protobuf;
  28. namespace pbio = ::google::protobuf::io;
  29. using namespace ::hadoop::common;
  30. using namespace ::std::placeholders;
  31. static void
  32. ConstructPacket(std::string *res,
  33. std::initializer_list<const pb::MessageLite *> headers,
  34. const std::string *request) {
  35. int len = 0;
  36. std::for_each(
  37. headers.begin(), headers.end(),
  38. [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
  39. if (request) {
  40. len += pbio::CodedOutputStream::VarintSize32(request->size()) +
  41. request->size();
  42. }
  43. int net_len = htonl(len);
  44. res->reserve(res->size() + sizeof(net_len) + len);
  45. pbio::StringOutputStream ss(res);
  46. pbio::CodedOutputStream os(&ss);
  47. os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
  48. uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
  49. assert(buf && "Cannot allocate memory");
  50. std::for_each(
  51. headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
  52. buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
  53. buf = v->SerializeWithCachedSizesToArray(buf);
  54. });
  55. if (request) {
  56. buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
  57. buf = os.WriteStringToArray(*request, buf);
  58. }
  59. }
  60. static void SetRequestHeader(RpcEngine *engine, int call_id,
  61. const std::string &method_name,
  62. RpcRequestHeaderProto *rpc_header,
  63. RequestHeaderProto *req_header) {
  64. rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
  65. rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
  66. rpc_header->set_callid(call_id);
  67. rpc_header->set_clientid(engine->client_name());
  68. req_header->set_methodname(method_name);
  69. req_header->set_declaringclassprotocolname(engine->protocol_name());
  70. req_header->set_clientprotocolversion(engine->protocol_version());
  71. }
  72. RpcConnection::~RpcConnection() {}
  73. RpcConnection::Request::Request(RpcConnection *parent,
  74. const std::string &method_name,
  75. const std::string &request, Handler &&handler)
  76. : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
  77. handler_(std::move(handler)) {
  78. RpcRequestHeaderProto rpc_header;
  79. RequestHeaderProto req_header;
  80. SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
  81. &req_header);
  82. ConstructPacket(&payload_, {&rpc_header, &req_header}, &request);
  83. }
  84. RpcConnection::Request::Request(RpcConnection *parent,
  85. const std::string &method_name,
  86. const pb::MessageLite *request,
  87. Handler &&handler)
  88. : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
  89. handler_(std::move(handler)) {
  90. RpcRequestHeaderProto rpc_header;
  91. RequestHeaderProto req_header;
  92. SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
  93. &req_header);
  94. ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr);
  95. }
  96. void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is,
  97. const Status &status) {
  98. handler_(is, status);
  99. }
  100. RpcConnection::RpcConnection(RpcEngine *engine)
  101. : engine_(engine), resp_state_(kReadLength), resp_length_(0) {}
  102. ::asio::io_service &RpcConnection::io_service() {
  103. return engine_->io_service();
  104. }
  105. void RpcConnection::Start() {
  106. io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
  107. ::asio::error_code(), 0));
  108. }
  109. void RpcConnection::FlushPendingRequests() {
  110. io_service().post([this]() {
  111. if (!request_over_the_wire_) {
  112. OnSendCompleted(::asio::error_code(), 0);
  113. }
  114. });
  115. }
  116. void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
  117. /* assumed to be called from a context that has already acquired the
  118. * engine_state_lock */
  119. pbio::ArrayInputStream ar(&data[0], data.size());
  120. pbio::CodedInputStream in(&ar);
  121. in.PushLimit(data.size());
  122. RpcResponseHeaderProto h;
  123. ReadDelimitedPBMessage(&in, &h);
  124. auto it = requests_on_fly_.find(h.callid());
  125. if (it == requests_on_fly_.end()) {
  126. // TODO: out of line RPC request
  127. assert(false && "Out of line request with unknown call id");
  128. }
  129. auto req = it->second;
  130. requests_on_fly_.erase(it);
  131. Status stat;
  132. if (h.has_exceptionclassname()) {
  133. stat =
  134. Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
  135. }
  136. req->OnResponseArrived(&in, stat);
  137. }
  138. std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
  139. static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
  140. RpcEngine::kRpcVersion, 0, 0};
  141. auto res =
  142. std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader));
  143. RpcRequestHeaderProto h;
  144. h.set_rpckind(RPC_PROTOCOL_BUFFER);
  145. h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
  146. h.set_callid(RpcEngine::kCallIdConnectionContext);
  147. h.set_clientid(engine_->client_name());
  148. IpcConnectionContextProto handshake;
  149. handshake.set_protocol(engine_->protocol_name());
  150. ConstructPacket(res.get(), {&h, &handshake}, nullptr);
  151. return res;
  152. }
  153. void RpcConnection::AsyncRpc(
  154. const std::string &method_name, const ::google::protobuf::MessageLite *req,
  155. std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) {
  156. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  157. auto wrapped_handler =
  158. [resp, handler](pbio::CodedInputStream *is, const Status &status) {
  159. if (status.ok()) {
  160. ReadDelimitedPBMessage(is, resp.get());
  161. }
  162. handler(status);
  163. };
  164. auto r = std::make_shared<Request>(this, method_name, req,
  165. std::move(wrapped_handler));
  166. pending_requests_.push_back(r);
  167. FlushPendingRequests();
  168. }
  169. void RpcConnection::AsyncRawRpc(const std::string &method_name,
  170. const std::string &req,
  171. std::shared_ptr<std::string> resp,
  172. Callback &&handler) {
  173. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  174. auto wrapped_handler =
  175. [this, resp, handler](pbio::CodedInputStream *is, const Status &status) {
  176. if (status.ok()) {
  177. uint32_t size = 0;
  178. is->ReadVarint32(&size);
  179. auto limit = is->PushLimit(size);
  180. is->ReadString(resp.get(), limit);
  181. is->PopLimit(limit);
  182. }
  183. handler(status);
  184. };
  185. auto r = std::make_shared<Request>(this, method_name, req,
  186. std::move(wrapped_handler));
  187. pending_requests_.push_back(r);
  188. FlushPendingRequests();
  189. }
  190. }