123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "rpc_engine.h"
- #include "RpcHeader.pb.h"
- #include "ProtobufRpcEngine.pb.h"
- #include "IpcConnectionContext.pb.h"
- #include "common/util.h"
- #include <asio/read.hpp>
- #include <google/protobuf/io/coded_stream.h>
- #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
- namespace hdfs {
- namespace pb = ::google::protobuf;
- namespace pbio = ::google::protobuf::io;
- using namespace ::hadoop::common;
- using namespace ::std::placeholders;
- static void
- ConstructPacket(std::string *res,
- std::initializer_list<const pb::MessageLite *> headers,
- const std::string *request) {
- int len = 0;
- std::for_each(
- headers.begin(), headers.end(),
- [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
- if (request) {
- len += pbio::CodedOutputStream::VarintSize32(request->size()) +
- request->size();
- }
- int net_len = htonl(len);
- res->reserve(res->size() + sizeof(net_len) + len);
- pbio::StringOutputStream ss(res);
- pbio::CodedOutputStream os(&ss);
- os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
- uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
- assert(buf && "Cannot allocate memory");
- std::for_each(
- headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
- buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
- buf = v->SerializeWithCachedSizesToArray(buf);
- });
- if (request) {
- buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
- buf = os.WriteStringToArray(*request, buf);
- }
- }
- static void SetRequestHeader(RpcEngine *engine, int call_id,
- const std::string &method_name,
- RpcRequestHeaderProto *rpc_header,
- RequestHeaderProto *req_header) {
- rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
- rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
- rpc_header->set_callid(call_id);
- rpc_header->set_clientid(engine->client_name());
- req_header->set_methodname(method_name);
- req_header->set_declaringclassprotocolname(engine->protocol_name());
- req_header->set_clientprotocolversion(engine->protocol_version());
- }
- RpcConnection::~RpcConnection() {}
- RpcConnection::Request::Request(RpcConnection *parent,
- const std::string &method_name,
- const std::string &request, Handler &&handler)
- : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
- handler_(std::move(handler)) {
- RpcRequestHeaderProto rpc_header;
- RequestHeaderProto req_header;
- SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
- &req_header);
- ConstructPacket(&payload_, {&rpc_header, &req_header}, &request);
- }
- RpcConnection::Request::Request(RpcConnection *parent,
- const std::string &method_name,
- const pb::MessageLite *request,
- Handler &&handler)
- : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
- handler_(std::move(handler)) {
- RpcRequestHeaderProto rpc_header;
- RequestHeaderProto req_header;
- SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
- &req_header);
- ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr);
- }
- void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is,
- const Status &status) {
- handler_(is, status);
- }
- RpcConnection::RpcConnection(RpcEngine *engine)
- : engine_(engine), resp_state_(kReadLength), resp_length_(0) {}
- ::asio::io_service &RpcConnection::io_service() {
- return engine_->io_service();
- }
- void RpcConnection::Start() {
- io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
- ::asio::error_code(), 0));
- }
- void RpcConnection::FlushPendingRequests() {
- io_service().post([this]() {
- if (!request_over_the_wire_) {
- OnSendCompleted(::asio::error_code(), 0);
- }
- });
- }
- void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
- /* assumed to be called from a context that has already acquired the
- * engine_state_lock */
- pbio::ArrayInputStream ar(&data[0], data.size());
- pbio::CodedInputStream in(&ar);
- in.PushLimit(data.size());
- RpcResponseHeaderProto h;
- ReadDelimitedPBMessage(&in, &h);
- auto it = requests_on_fly_.find(h.callid());
- if (it == requests_on_fly_.end()) {
- // TODO: out of line RPC request
- assert(false && "Out of line request with unknown call id");
- }
- auto req = it->second;
- requests_on_fly_.erase(it);
- Status stat;
- if (h.has_exceptionclassname()) {
- stat =
- Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
- }
- req->OnResponseArrived(&in, stat);
- }
- std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
- static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
- RpcEngine::kRpcVersion, 0, 0};
- auto res =
- std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader));
- RpcRequestHeaderProto h;
- h.set_rpckind(RPC_PROTOCOL_BUFFER);
- h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
- h.set_callid(RpcEngine::kCallIdConnectionContext);
- h.set_clientid(engine_->client_name());
- IpcConnectionContextProto handshake;
- handshake.set_protocol(engine_->protocol_name());
- ConstructPacket(res.get(), {&h, &handshake}, nullptr);
- return res;
- }
- void RpcConnection::AsyncRpc(
- const std::string &method_name, const ::google::protobuf::MessageLite *req,
- std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) {
- std::lock_guard<std::mutex> state_lock(engine_state_lock_);
- auto wrapped_handler =
- [resp, handler](pbio::CodedInputStream *is, const Status &status) {
- if (status.ok()) {
- ReadDelimitedPBMessage(is, resp.get());
- }
- handler(status);
- };
- auto r = std::make_shared<Request>(this, method_name, req,
- std::move(wrapped_handler));
- pending_requests_.push_back(r);
- FlushPendingRequests();
- }
- void RpcConnection::AsyncRawRpc(const std::string &method_name,
- const std::string &req,
- std::shared_ptr<std::string> resp,
- Callback &&handler) {
- std::lock_guard<std::mutex> state_lock(engine_state_lock_);
- auto wrapped_handler =
- [this, resp, handler](pbio::CodedInputStream *is, const Status &status) {
- if (status.ok()) {
- uint32_t size = 0;
- is->ReadVarint32(&size);
- auto limit = is->PushLimit(size);
- is->ReadString(resp.get(), limit);
- is->PopLimit(limit);
- }
- handler(status);
- };
- auto r = std::make_shared<Request>(this, method_name, req,
- std::move(wrapped_handler));
- pending_requests_.push_back(r);
- FlushPendingRequests();
- }
- }
|