rpc_engine.cc 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "rpc_engine.h"
  19. #include "rpc_connection.h"
  20. #include "common/util.h"
  21. #include "optional.hpp"
  22. #include <future>
  23. namespace hdfs {
  24. template <class T>
  25. using optional = std::experimental::optional<T>;
  26. RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
  27. const std::string &client_name, const char *protocol_name,
  28. int protocol_version)
  29. : io_service_(io_service),
  30. options_(options),
  31. client_name_(client_name),
  32. protocol_name_(protocol_name),
  33. protocol_version_(protocol_version),
  34. retry_policy_(std::move(MakeRetryPolicy(options))),
  35. call_id_(0),
  36. retry_timer(*io_service) {}
  37. void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server,
  38. RpcCallback &handler) {
  39. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  40. last_endpoint_ = server;
  41. conn_ = NewConnection();
  42. conn_->Connect(server, handler);
  43. }
  44. void RpcEngine::Shutdown() {
  45. io_service_->post([this]() {
  46. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  47. conn_->Disconnect();
  48. conn_.reset();
  49. });
  50. }
  51. std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options &options) {
  52. if (options.max_rpc_retries > 0) {
  53. return std::unique_ptr<RetryPolicy>(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, options.max_rpc_retries));
  54. } else {
  55. return nullptr;
  56. }
  57. }
  58. void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
  59. conn_ = conn;
  60. }
  61. void RpcEngine::AsyncRpc(
  62. const std::string &method_name, const ::google::protobuf::MessageLite *req,
  63. const std::shared_ptr<::google::protobuf::MessageLite> &resp,
  64. const std::function<void(const Status &)> &handler) {
  65. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  66. if (!conn_) {
  67. conn_ = NewConnection();
  68. conn_->ConnectAndFlush(last_endpoint_);
  69. }
  70. conn_->AsyncRpc(method_name, req, resp, handler);
  71. }
  72. Status RpcEngine::Rpc(
  73. const std::string &method_name, const ::google::protobuf::MessageLite *req,
  74. const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
  75. auto stat = std::make_shared<std::promise<Status>>();
  76. std::future<Status> future(stat->get_future());
  77. AsyncRpc(method_name, req, resp,
  78. [stat](const Status &status) { stat->set_value(status); });
  79. return future.get();
  80. }
  81. std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
  82. {
  83. return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(this);
  84. }
  85. Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
  86. std::shared_ptr<std::string> resp) {
  87. std::shared_ptr<RpcConnection> conn;
  88. {
  89. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  90. if (!conn_) {
  91. conn_ = NewConnection();
  92. conn_->ConnectAndFlush(last_endpoint_);
  93. }
  94. conn = conn_;
  95. }
  96. auto stat = std::make_shared<std::promise<Status>>();
  97. std::future<Status> future(stat->get_future());
  98. conn->AsyncRawRpc(method_name, req, resp,
  99. [stat](const Status &status) { stat->set_value(status); });
  100. return future.get();
  101. }
  102. void RpcEngine::AsyncRpcCommsError(
  103. const Status &status,
  104. std::vector<std::shared_ptr<Request>> pendingRequests) {
  105. io_service().post([this, status, pendingRequests]() {
  106. RpcCommsError(status, pendingRequests);
  107. });
  108. }
  109. void RpcEngine::RpcCommsError(
  110. const Status &status,
  111. std::vector<std::shared_ptr<Request>> pendingRequests) {
  112. (void)status;
  113. std::lock_guard<std::mutex> state_lock(engine_state_lock_);
  114. auto head_action = optional<RetryAction>();
  115. // Filter out anything with too many retries already
  116. for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
  117. auto req = *it;
  118. RetryAction retry = RetryAction::fail(""); // Default to fail
  119. if (retry_policy()) {
  120. retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), 0, true);
  121. }
  122. if (retry.action == RetryAction::FAIL) {
  123. // If we've exceeded the maximum retry, take the latest error and pass it
  124. // on. There might be a good argument for caching the first error
  125. // rather than the last one, that gets messy
  126. io_service().post([req, status]() {
  127. req->OnResponseArrived(nullptr, status); // Never call back while holding a lock
  128. });
  129. it = pendingRequests.erase(it);
  130. } else {
  131. if (!head_action) {
  132. head_action = retry;
  133. }
  134. ++it;
  135. }
  136. }
  137. // Close the connection and retry and requests that might have been sent to
  138. // the NN
  139. if (!pendingRequests.empty() &&
  140. head_action && head_action->action != RetryAction::FAIL) {
  141. conn_ = NewConnection();
  142. conn_->PreEnqueueRequests(pendingRequests);
  143. if (head_action->delayMillis > 0) {
  144. retry_timer.expires_from_now(
  145. std::chrono::milliseconds(options_.rpc_retry_delay_ms));
  146. retry_timer.async_wait([this](asio::error_code ec) {
  147. if (!ec) conn_->ConnectAndFlush(last_endpoint_);
  148. });
  149. } else {
  150. conn_->ConnectAndFlush(last_endpoint_);
  151. }
  152. } else {
  153. // Connection will try again if someone calls AsyncRpc
  154. conn_.reset();
  155. }
  156. }
  157. }