Explorar el Código

HDFS-9095. RPC client should fail gracefully when the connection is timed out or reset. Contributed by Haohui Mai.

Haohui Mai hace 10 años
padre
commit
9e929a7a0d
Se han modificado 18 ficheros con 456 adiciones y 75 borrados
  1. 0 10
      hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
  2. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
  3. 2 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
  4. 35 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
  6. 61 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h
  7. 27 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc
  8. 7 6
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
  10. 15 17
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
  11. 51 7
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
  12. 23 14
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
  13. 11 7
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
  14. 18 7
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
  15. 12 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  16. 8 4
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc
  17. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
  18. 179 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc

+ 0 - 10
hadoop-hdfs-project/hadoop-hdfs-client/pom.xml

@@ -190,16 +190,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-antrun-plugin</artifactId>
             <executions>
-              <execution>
-                <id>debug</id>
-                <phase>compile</phase>
-                <goals><goal>run</goal></goals>
-                <configuration>
-                  <target>
-                    <echo>[PROTOC] ${env.HADOOP_PROTOC_PATH}</echo>
-                  </target>
-                </configuration>
-              </execution>
               <execution>
                 <id>make</id>
                 <phase>compile</phase>

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt

@@ -50,6 +50,10 @@ include_directories(
   third_party/gmock-1.7.0
 )
 
+set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../proto)
+set(PROTO_HADOOP_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/main/proto)
+set(PROTO_HADOOP_TEST_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../hadoop-common-project/hadoop-common/src/test/proto)
+
 add_subdirectory(third_party/gmock-1.7.0)
 add_subdirectory(lib)
 add_subdirectory(tests)

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h

@@ -18,6 +18,7 @@
 #ifndef LIBHDFSPP_HDFS_H_
 #define LIBHDFSPP_HDFS_H_
 
+#include "libhdfspp/options.h"
 #include "libhdfspp/status.h"
 
 #include <functional>
@@ -89,7 +90,7 @@ public:
    * FileSystem object.
    **/
   static void
-  New(IoService *io_service, const std::string &server,
+  New(IoService *io_service, const Options &options, const std::string &server,
       const std::string &service,
       const std::function<void(const Status &, FileSystem *)> &handler);
   /**

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/options.h

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_OPTIONS_H_
+#define LIBHDFSPP_OPTIONS_H_
+
+namespace hdfs {
+
+/**
+ * Options to control the behavior of the libhdfspp library.
+ **/
+struct Options {
+  /**
+   * Time out of RPC requests in milliseconds.
+   * Default: 30000
+   **/
+  int rpc_timeout;
+  Options();
+};
+}
+#endif

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt

@@ -1 +1 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc)
+add_library(common base64.cc options.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc)

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/logging.h

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_COMMON_LOGGING_H_
+#define LIB_COMMON_LOGGING_H_
+
+#include <iostream>
+
+namespace hdfs {
+
+enum LogLevel {
+  kDebug,
+  kInfo,
+  kWarning,
+  kError,
+};
+
+#define LOG_DEBUG() LogMessage(kDebug)
+#define LOG_INFO() LogMessage(kInfo)
+#define LOG_WARN() LogMessage(kWarning)
+#define LOG_ERROR() LogMessage(kError)
+
+class LogMessage {
+ public:
+  LogMessage(const LogLevel &l) {
+    static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"};
+    ::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] ";
+  }
+
+  ~LogMessage() {
+    ::std::cerr << std::endl;
+  }
+
+  LogMessage& operator<<(const std::string& msg) {
+    ::std::cerr << msg;
+    return *this;
+  }
+  LogMessage& operator<<(int x) {
+    ::std::cerr << x;
+    return *this;
+  }
+};
+
+}
+
+#endif

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/options.cc

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/options.h"
+
+namespace hdfs {
+
+Options::Options()
+    : rpc_timeout(30000)
+{}
+
+}

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc

@@ -35,10 +35,10 @@ using ::asio::ip::tcp;
 FileSystem::~FileSystem() {}
 
 void FileSystem::New(
-    IoService *io_service, const std::string &server,
+    IoService *io_service, const Options &options, const std::string &server,
     const std::string &service,
     const std::function<void(const Status &, FileSystem *)> &handler) {
-  FileSystemImpl *impl = new FileSystemImpl(io_service);
+  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
   impl->Connect(server, service, [impl, handler](const Status &stat) {
     if (stat.ok()) {
       handler(stat, impl);
@@ -49,10 +49,11 @@ void FileSystem::New(
   });
 }
 
-FileSystemImpl::FileSystemImpl(IoService *io_service)
+FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options)
     : io_service_(static_cast<IoServiceImpl *>(io_service)),
-      engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
-              kNamenodeProtocol, kNamenodeProtocolVersion),
+      engine_(&io_service_->io_service(), options,
+              RpcEngine::GetRandomClientName(), kNamenodeProtocol,
+              kNamenodeProtocolVersion),
       namenode_(&engine_) {}
 
 void FileSystemImpl::Connect(const std::string &server,
@@ -64,7 +65,7 @@ void FileSystemImpl::Connect(const std::string &server,
   m->Push(Resolve(&io_service_->io_service(), server, service,
                   std::back_inserter(m->state())))
       .Push(Bind([this, m](const Continuation::Next &next) {
-        engine_.Connect(m->state(), next);
+        engine_.Connect(m->state().front(), next);
       }));
   m->Run([this, handler](const Status &status, const State &) {
     if (status.ok()) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h

@@ -28,7 +28,7 @@ namespace hdfs {
 
 class FileSystemImpl : public FileSystem {
 public:
-  FileSystemImpl(IoService *io_service);
+  FileSystemImpl(IoService *io_service, const Options &options);
   void Connect(const std::string &server, const std::string &service,
                std::function<void(const Status &)> &&handler);
   virtual void Open(const std::string &path,

+ 15 - 17
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt

@@ -1,21 +1,19 @@
-set(CLIENT_PROTO_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../proto)
-set(COMMON_PROTO_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../../../../hadoop-common-project/hadoop-common/src/main/proto)
-set(PROTOBUF_IMPORT_DIRS ${CLIENT_PROTO_DIR} ${COMMON_PROTO_DIR})
+set(PROTOBUF_IMPORT_DIRS ${PROTO_HDFS_DIR} ${PROTO_HADOOP_DIR})
 
 protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
-  ${CLIENT_PROTO_DIR}/datatransfer.proto
-  ${CLIENT_PROTO_DIR}/ClientDatanodeProtocol.proto
-  ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
-  ${CLIENT_PROTO_DIR}/acl.proto
-  ${CLIENT_PROTO_DIR}/datatransfer.proto
-  ${CLIENT_PROTO_DIR}/encryption.proto
-  ${CLIENT_PROTO_DIR}/hdfs.proto
-  ${CLIENT_PROTO_DIR}/inotify.proto
-  ${CLIENT_PROTO_DIR}/xattr.proto
-  ${COMMON_PROTO_DIR}/IpcConnectionContext.proto
-  ${COMMON_PROTO_DIR}/ProtobufRpcEngine.proto
-  ${COMMON_PROTO_DIR}/RpcHeader.proto
-  ${COMMON_PROTO_DIR}/Security.proto
+  ${PROTO_HDFS_DIR}/datatransfer.proto
+  ${PROTO_HDFS_DIR}/ClientDatanodeProtocol.proto
+  ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto
+  ${PROTO_HDFS_DIR}/acl.proto
+  ${PROTO_HDFS_DIR}/datatransfer.proto
+  ${PROTO_HDFS_DIR}/encryption.proto
+  ${PROTO_HDFS_DIR}/hdfs.proto
+  ${PROTO_HDFS_DIR}/inotify.proto
+  ${PROTO_HDFS_DIR}/xattr.proto
+  ${PROTO_HADOOP_DIR}/IpcConnectionContext.proto
+  ${PROTO_HADOOP_DIR}/ProtobufRpcEngine.proto
+  ${PROTO_HADOOP_DIR}/RpcHeader.proto
+  ${PROTO_HADOOP_DIR}/Security.proto
 )
 
 add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc)
@@ -59,7 +57,7 @@ function(GEN_HRPC SRCS)
 endfunction()
 
 gen_hrpc(HRPC_SRCS
-  ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
+  ${PROTO_HDFS_DIR}/ClientNamenodeProtocol.proto
 )
 
 add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS})

+ 51 - 7
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc

@@ -21,6 +21,7 @@
 #include "ProtobufRpcEngine.pb.h"
 #include "IpcConnectionContext.pb.h"
 
+#include "common/logging.h"
 #include "common/util.h"
 
 #include <asio/read.hpp>
@@ -57,7 +58,6 @@ ConstructPacket(std::string *res,
   os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
 
   uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf && "Cannot allocate memory");
 
   std::for_each(
       headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
@@ -146,14 +146,12 @@ void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
   RpcResponseHeaderProto h;
   ReadDelimitedPBMessage(&in, &h);
 
-  auto it = requests_on_fly_.find(h.callid());
-  if (it == requests_on_fly_.end()) {
-    // TODO: out of line RPC request
-    assert(false && "Out of line request with unknown call id");
+  auto req = RemoveFromRunningQueue(h.callid());
+  if (!req) {
+    LOG_WARN() << "RPC response with Unknown call id " << h.callid();
+    return;
   }
 
-  auto req = it->second;
-  requests_on_fly_.erase(it);
   Status stat;
   if (h.has_exceptionclassname()) {
     stat =
@@ -162,6 +160,24 @@ void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
   req->OnResponseArrived(&in, stat);
 }
 
+void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
+                                     const ::asio::error_code &ec) {
+  if (ec.value() == asio::error::operation_aborted) {
+    return;
+  }
+
+  std::lock_guard<std::mutex> state_lock(engine_state_lock_);
+  auto r = RemoveFromRunningQueue(req->call_id());
+  if (!r) {
+    // The RPC might have been finished and removed from the queue
+    return;
+  }
+
+  Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out));
+
+  r->OnResponseArrived(nullptr, stat);
+}
+
 std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
   static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
                                           RpcEngine::kRpcVersion, 0, 0};
@@ -223,4 +239,32 @@ void RpcConnection::AsyncRawRpc(const std::string &method_name,
   pending_requests_.push_back(r);
   FlushPendingRequests();
 }
+
+void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
+  Shutdown();
+  std::vector<std::shared_ptr<Request>> requests;
+  std::transform(requests_on_fly_.begin(), requests_on_fly_.end(),
+                 std::back_inserter(requests),
+                 std::bind(&RequestOnFlyMap::value_type::second, _1));
+  requests_on_fly_.clear();
+  requests.insert(requests.end(),
+                  std::make_move_iterator(pending_requests_.begin()),
+                  std::make_move_iterator(pending_requests_.end()));
+  pending_requests_.clear();
+  for (const auto &req : requests) {
+    req->OnResponseArrived(nullptr, ToStatus(ec));
+  }
+}
+
+std::shared_ptr<RpcConnection::Request>
+RpcConnection::RemoveFromRunningQueue(int call_id) {
+  auto it = requests_on_fly_.find(call_id);
+  if (it == requests_on_fly_.end()) {
+    return std::shared_ptr<Request>();
+  }
+
+  auto req = it->second;
+  requests_on_fly_.erase(it);
+  return req;
+}
 }

+ 23 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h

@@ -19,6 +19,8 @@
 #define LIB_RPC_RPC_CONNECTION_H_
 
 #include "rpc_engine.h"
+
+#include "common/logging.h"
 #include "common/util.h"
 
 #include <asio/connect.hpp>
@@ -30,7 +32,7 @@ namespace hdfs {
 template <class NextLayer> class RpcConnectionImpl : public RpcConnection {
 public:
   RpcConnectionImpl(RpcEngine *engine);
-  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
                        Callback &&handler) override;
   virtual void Handshake(Callback &&handler) override;
   virtual void Shutdown() override;
@@ -39,23 +41,22 @@ public:
   virtual void OnRecvCompleted(const ::asio::error_code &ec,
                                size_t transferred) override;
 
+  NextLayer &next_layer() { return next_layer_; }
 private:
+  const Options options_;
   NextLayer next_layer_;
 };
 
 template <class NextLayer>
 RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
-    : RpcConnection(engine)
-    , next_layer_(engine->io_service())
-{}
+    : RpcConnection(engine), options_(engine->options()),
+      next_layer_(engine->io_service()) {}
 
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Connect(
-    const std::vector<::asio::ip::tcp::endpoint> &server, Callback &&handler) {
-  ::asio::async_connect(
-      next_layer_, server.begin(), server.end(),
-      [handler](const ::asio::error_code &ec,
-                std::vector<::asio::ip::tcp::endpoint>::const_iterator) {
+    const ::asio::ip::tcp::endpoint &server, Callback &&handler) {
+  next_layer_.async_connect(server,
+      [handler](const ::asio::error_code &ec) {
         handler(ToStatus(ec));
       });
 }
@@ -79,9 +80,10 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
 
   request_over_the_wire_.reset();
   if (ec) {
-    // TODO: Current RPC has failed -- we should abandon the
+    // Current RPC has failed -- abandon the
     // connection and do proper clean up
-    assert(false && "Unimplemented");
+    ClearAndDisconnect(ec);
+    return;
   }
 
   if (!pending_requests_.size()) {
@@ -93,7 +95,10 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
   requests_on_fly_[req->call_id()] = req;
   request_over_the_wire_ = req;
 
-  // TODO: set the timeout for the RPC request
+  req->timer().expires_from_now(
+      std::chrono::milliseconds(options_.rpc_timeout));
+  req->timer().async_wait(std::bind(
+      &RpcConnectionImpl<NextLayer>::HandleRpcTimeout, this, req, _1));
 
   asio::async_write(
       next_layer_, asio::buffer(req->payload()),
@@ -115,7 +120,9 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
     // The event loop has been shut down. Ignore the error.
     return;
   default:
-    assert(false && "Unimplemented");
+    LOG_WARN() << "Network error during RPC: " << ec.message();
+    ClearAndDisconnect(ec);
+    return;
   }
 
   if (resp_state_ == kReadLength) {
@@ -131,7 +138,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
     resp_length_ = ntohl(resp_length_);
     resp_data_.resize(resp_length_);
     asio::async_read(next_layer_, ::asio::buffer(resp_data_),
-                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, this, _1, _2));
+                     std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
+                               this, _1, _2));
 
   } else if (resp_state_ == kParseResponse) {
     resp_state_ = kReadLength;
@@ -142,6 +150,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
 }
 
 template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() {
+  next_layer_.cancel();
   next_layer_.close();
 }
 }

+ 11 - 7
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -26,18 +26,18 @@
 
 namespace hdfs {
 
-RpcEngine::RpcEngine(::asio::io_service *io_service,
+RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
                      const std::string &client_name, const char *protocol_name,
                      int protocol_version)
-    : io_service_(io_service), client_name_(client_name),
+    : io_service_(io_service), options_(options), client_name_(client_name),
       protocol_name_(protocol_name), protocol_version_(protocol_version),
-      call_id_(0)
-    , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
-{}
+      call_id_(0) {
+}
 
-void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers,
+void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server,
                         const std::function<void(const Status &)> &handler) {
-  conn_->Connect(servers, [this, handler](const Status &stat) {
+  conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this));
+  conn_->Connect(server, [this, handler](const Status &stat) {
     if (!stat.ok()) {
       handler(stat);
     } else {
@@ -52,6 +52,10 @@ void RpcEngine::Shutdown() {
   io_service_->post([this]() { conn_->Shutdown(); });
 }
 
+void RpcEngine::TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn) {
+  conn_.reset(conn->release());
+}
+
 void RpcEngine::AsyncRpc(
     const std::string &method_name, const ::google::protobuf::MessageLite *req,
     const std::shared_ptr<::google::protobuf::MessageLite> &resp,

+ 18 - 7
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h

@@ -18,6 +18,7 @@
 #ifndef LIB_RPC_RPC_ENGINE_H_
 #define LIB_RPC_RPC_ENGINE_H_
 
+#include "libhdfspp/options.h"
 #include "libhdfspp/status.h"
 
 #include <google/protobuf/message_lite.h>
@@ -39,7 +40,7 @@ public:
   typedef std::function<void(const Status &)> Callback;
   virtual ~RpcConnection();
   RpcConnection(RpcEngine *engine);
-  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
                        Callback &&handler) = 0;
   virtual void Handshake(Callback &&handler) = 0;
   virtual void Shutdown() = 0;
@@ -54,6 +55,7 @@ public:
                    std::shared_ptr<std::string> resp, Callback &&handler);
 
 protected:
+  class Request;
   RpcEngine *const engine_;
   virtual void OnSendCompleted(const ::asio::error_code &ec,
                                size_t transferred) = 0;
@@ -66,7 +68,11 @@ protected:
   SerializeRpcRequest(const std::string &method_name,
                       const ::google::protobuf::MessageLite *req);
   void HandleRpcResponse(const std::vector<char> &data);
+  void HandleRpcTimeout(std::shared_ptr<Request> req,
+                        const ::asio::error_code &ec);
   void FlushPendingRequests();
+  void ClearAndDisconnect(const ::asio::error_code &ec);
+  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
 
   enum ResponseState {
     kReadLength,
@@ -89,7 +95,8 @@ protected:
     ::asio::deadline_timer &timer() { return timer_; }
     const std::string &payload() const { return payload_; }
     void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
-                           const Status &status); 
+                           const Status &status);
+
   private:
     const int call_id_;
     ::asio::deadline_timer timer_;
@@ -102,7 +109,8 @@ protected:
   // Requests to be sent over the wire
   std::vector<std::shared_ptr<Request>> pending_requests_;
   // Requests that are waiting for responses
-  std::unordered_map<int, std::shared_ptr<Request>> requests_on_fly_;
+  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
+  RequestOnFlyMap requests_on_fly_;
   // Lock for mutable parts of this class that need to be thread safe
   std::mutex engine_state_lock_;
 };
@@ -117,8 +125,9 @@ public:
     kCallIdPing = -4
   };
 
-  RpcEngine(::asio::io_service *io_service, const std::string &client_name,
-            const char *protocol_name, int protocol_version);
+  RpcEngine(::asio::io_service *io_service, const Options &options,
+            const std::string &client_name, const char *protocol_name,
+            int protocol_version);
 
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
@@ -134,10 +143,11 @@ public:
    **/
   Status RawRpc(const std::string &method_name, const std::string &req,
                 std::shared_ptr<std::string> resp);
-  void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+  void Connect(const ::asio::ip::tcp::endpoint &server,
                const std::function<void(const Status &)> &handler);
   void Start();
   void Shutdown();
+  void TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn);
 
   int NextCallId() { return ++call_id_; }
 
@@ -145,11 +155,12 @@ public:
   const std::string &protocol_name() const { return protocol_name_; }
   int protocol_version() const { return protocol_version_; }
   ::asio::io_service &io_service() { return *io_service_; }
-
+  const Options &options() { return options_; }
   static std::string GetRandomClientName();
 
 private:
   ::asio::io_service *io_service_;
+  Options options_;
   const std::string client_name_;
   const std::string protocol_name_;
   const int protocol_version_;

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -18,6 +18,13 @@
 
 add_library(test_common OBJECT mock_connection.cc)
 
+set(PROTOBUF_IMPORT_DIRS ${PROTO_HADOOP_TEST_DIR})
+
+protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS
+  ${PROTO_HADOOP_TEST_DIR}/test.proto
+  ${PROTO_HADOOP_TEST_DIR}/test_rpc_service.proto
+)
+
 add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
 target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(remote_block_reader remote_block_reader_test)
@@ -29,3 +36,8 @@ add_test(sasl_digest_md5 sasl_digest_md5_test)
 add_executable(inputstream_test inputstream_test.cc)
 target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_test(inputstream inputstream_test)
+
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
+target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+add_test(rpc_engine rpc_engine_test)

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc

@@ -76,7 +76,8 @@ TEST(InputStreamTest, TestReadSingleTrunk) {
       0,
   };
   IoServiceImpl io_service;
-  FileSystemImpl fs(&io_service);
+  Options options;
+  FileSystemImpl fs(&io_service, options);
   InputStreamImpl is(&fs, &blocks);
   Status stat;
   size_t read = 0;
@@ -109,7 +110,8 @@ TEST(InputStreamTest, TestReadMultipleTrunk) {
       0,
   };
   IoServiceImpl io_service;
-  FileSystemImpl fs(&io_service);
+  Options options;
+  FileSystemImpl fs(&io_service, options);
   InputStreamImpl is(&fs, &blocks);
   Status stat;
   size_t read = 0;
@@ -144,7 +146,8 @@ TEST(InputStreamTest, TestReadError) {
       0,
   };
   IoServiceImpl io_service;
-  FileSystemImpl fs(&io_service);
+  Options options;
+  FileSystemImpl fs(&io_service, options);
   InputStreamImpl is(&fs, &blocks);
   Status stat;
   size_t read = 0;
@@ -190,7 +193,8 @@ TEST(InputStreamTest, TestExcludeDataNode) {
       0,
   };
   IoServiceImpl io_service;
-  FileSystemImpl fs(&io_service);
+  Options options;
+  FileSystemImpl fs(&io_service, options);
   InputStreamImpl is(&fs, &blocks);
   Status stat;
   size_t read = 0;

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h

@@ -38,6 +38,7 @@ public:
       ProducerResult r = Produce();
       if (r.first) {
         io_service_->post(std::bind(handler, r.first, 0));
+        return;
       }
       asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
       asio::buffer_copy(data, asio::buffer(r.second));

+ 179 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc

@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mock_connection.h"
+#include "test.pb.h"
+#include "RpcHeader.pb.h"
+#include "rpc/rpc_connection.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#include <gmock/gmock.h>
+
+using ::hadoop::common::RpcResponseHeaderProto;
+using ::hadoop::common::EmptyRequestProto;
+using ::hadoop::common::EmptyResponseProto;
+using ::hadoop::common::EchoRequestProto;
+using ::hadoop::common::EchoResponseProto;
+
+using ::asio::error_code;
+
+using ::testing::Return;
+
+using ::std::make_pair;
+using ::std::string;
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+namespace hdfs {
+
+class MockRPCConnection : public MockConnectionBase {
+public:
+  MockRPCConnection(::asio::io_service &io_service)
+      : MockConnectionBase(&io_service) {}
+  MOCK_METHOD0(Produce, ProducerResult());
+  template <class Endpoint, class Callback>
+  void async_connect(const Endpoint &, Callback &&handler) {
+    handler(::asio::error_code());
+  }
+  void cancel() {}
+  void close() {}
+};
+
+static inline std::pair<error_code, string>
+RpcResponse(const RpcResponseHeaderProto &h, const std::string &data,
+            const ::asio::error_code &ec = error_code()) {
+  uint32_t payload_length =
+      pbio::CodedOutputStream::VarintSize32(h.ByteSize()) +
+      pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() +
+      data.size();
+
+  std::string res;
+  res.resize(sizeof(uint32_t) + payload_length);
+  uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str()));
+
+  buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray(
+      htonl(payload_length), buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf);
+  buf = h.SerializeWithCachedSizesToArray(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf);
+  buf = pbio::CodedOutputStream::WriteStringToArray(data, buf);
+
+  return std::make_pair(ec, std::move(res));
+}
+}
+
+using namespace hdfs;
+
+TEST(RpcEngineTest, TestRoundTrip) {
+  ::asio::io_service io_service;
+  Options options;
+  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcConnectionImpl<MockRPCConnection> *conn =
+      new RpcConnectionImpl<MockRPCConnection>(&engine);
+  EchoResponseProto server_resp;
+  server_resp.set_message("foo");
+
+  RpcResponseHeaderProto h;
+  h.set_callid(1);
+  h.set_status(RpcResponseHeaderProto::SUCCESS);
+  EXPECT_CALL(conn->next_layer(), Produce())
+      .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
+
+  std::unique_ptr<RpcConnection> conn_ptr(conn);
+  engine.TEST_SetRpcConnection(&conn_ptr);
+
+  EchoRequestProto req;
+  req.set_message("foo");
+  std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
+  engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) {
+    ASSERT_TRUE(stat.ok());
+    ASSERT_EQ("foo", resp->message());
+    io_service.stop();
+  });
+  conn->Start();
+  io_service.run();
+}
+
+TEST(RpcEngineTest, TestConnectionReset) {
+  ::asio::io_service io_service;
+  Options options;
+  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcConnectionImpl<MockRPCConnection> *conn =
+      new RpcConnectionImpl<MockRPCConnection>(&engine);
+
+  RpcResponseHeaderProto h;
+  h.set_callid(1);
+  h.set_status(RpcResponseHeaderProto::SUCCESS);
+  EXPECT_CALL(conn->next_layer(), Produce())
+      .WillOnce(Return(RpcResponse(
+          h, "", make_error_code(::asio::error::connection_reset))));
+
+  std::unique_ptr<RpcConnection> conn_ptr(conn);
+  engine.TEST_SetRpcConnection(&conn_ptr);
+
+  EchoRequestProto req;
+  req.set_message("foo");
+  std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
+
+  engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) {
+    ASSERT_FALSE(stat.ok());
+  });
+
+  engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) {
+    io_service.stop();
+    ASSERT_FALSE(stat.ok());
+  });
+  conn->Start();
+  io_service.run();
+}
+
+TEST(RpcEngineTest, TestTimeout) {
+  ::asio::io_service io_service;
+  Options options;
+  options.rpc_timeout = 1;
+  RpcEngine engine(&io_service, options, "foo", "protocol", 1);
+  RpcConnectionImpl<MockRPCConnection> *conn =
+      new RpcConnectionImpl<MockRPCConnection>(&engine);
+
+  EXPECT_CALL(conn->next_layer(), Produce()).Times(0);
+
+  std::unique_ptr<RpcConnection> conn_ptr(conn);
+  engine.TEST_SetRpcConnection(&conn_ptr);
+
+  EchoRequestProto req;
+  req.set_message("foo");
+  std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
+  engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) {
+    io_service.stop();
+    ASSERT_FALSE(stat.ok());
+  });
+
+  ::asio::deadline_timer timer(io_service);
+  timer.expires_from_now(std::chrono::milliseconds(options.rpc_timeout * 2));
+  timer.async_wait(std::bind(&RpcConnection::Start, conn));
+  io_service.run();
+}
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}