Forráskód Böngészése

HDFS-11106: libhdfs++: Some refactoring to better organize files (part 2). Contributed by James Clampffer.

James 8 éve
szülő
commit
d75104eacf

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt

@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-list(APPEND rpc_object_items rpc_connection.cc rpc_engine.cc sasl_protocol.cc sasl_engine.cc)
+list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc)
 if (CMAKE_USING_CYRUS_SASL)
   list(APPEND rpc_object_items cyrus_sasl_engine.cc)
 endif (CMAKE_USING_CYRUS_SASL)

+ 134 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "namenode_tracker.h"
+
+#include "common/logging.h"
+#include "common/libhdfs_events_impl.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) {
+  std::stringstream ss;
+  for(unsigned int i=0; i<pts.size(); i++)
+    if(i == pts.size() - 1)
+      ss << pts[i];
+    else
+      ss << pts[i] << ", ";
+  return ss.str();
+}
+
+HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
+                                     ::asio::io_service *ioservice,
+                                     std::shared_ptr<LibhdfsEvents> event_handlers)
+                  : enabled_(false), resolved_(false),
+                    ioservice_(ioservice), event_handlers_(event_handlers)
+{
+  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
+  for(unsigned int i=0;i<servers.size();i++)
+    LOG_TRACE(kRPC, << servers[i].str());
+
+  if(servers.size() >= 2) {
+    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
+    if(servers.size() > 2) {
+      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't be used.");
+    }
+
+    active_info_ = servers[0];
+    standby_info_ = servers[1];
+    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
+    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
+
+    enabled_ = true;
+    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
+      resolved_ = true;
+    }
+  }
+}
+
+HANamenodeTracker::~HANamenodeTracker() {}
+
+//  Pass in endpoint from current connection, this will do a reverse lookup
+//  and return the info for the standby node. It will also swap its state internally.
+ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
+  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
+  mutex_guard swap_lock(swap_lock_);
+
+  ResolvedNamenodeInfo failover_node;
+
+  // Connected to standby, switch standby to active
+  if(IsCurrentActive_locked(current_endpoint)) {
+    std::swap(active_info_, standby_info_);
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
+                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else if(IsCurrentStandby_locked(current_endpoint)) {
+    // Connected to standby
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
+                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else {
+    // Invalid state, throw for testing
+    std::string ep1 = format_endpoints(active_info_.endpoints);
+    std::string ep2 = format_endpoints(standby_info_.endpoints);
+
+    std::stringstream msg;
+    msg << "Looked for " << current_endpoint << " in\n";
+    msg << ep1 << " and\n";
+    msg << ep2 << std::endl;
+
+    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
+    throw std::runtime_error(msg.str());
+  }
+
+  if(failover_node.endpoints.empty()) {
+    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
+    if(!ResolveInPlace(ioservice_, failover_node)) {
+      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
+                      << "failed.  Please make sure your configuration is up to date.");
+    }
+  }
+  return failover_node;
+}
+
+bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
+  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
+    if(ep.address() == active_info_.endpoints[i].address()) {
+      if(ep.port() != active_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const {
+  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
+    if(ep.address() == standby_info_.endpoints[i].address()) {
+      if(ep.port() != standby_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+} // end namespace hdfs

+ 81 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_RPC_NAMENODE_TRACKER_H
+#define LIB_RPC_NAMENODE_TRACKER_H
+
+#include "common/libhdfs_events_impl.h"
+#include "common/namenode_info.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace hdfs {
+
+/*
+ *  Tracker gives the RpcEngine a quick way to use an endpoint that just
+ *  failed in order to lookup a set of endpoints for a failover node.
+ *
+ *  Note: For now this only deals with 2 NameNodes, but that's the default
+ *  anyway.
+ */
+class HANamenodeTracker {
+ public:
+  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
+                    ::asio::io_service *ioservice,
+                    std::shared_ptr<LibhdfsEvents> event_handlers_);
+
+  virtual ~HANamenodeTracker();
+
+  bool is_enabled() const { return enabled_; }
+  bool is_resolved() const { return resolved_; }
+
+  // Get node opposite of the current one if possible (swaps active/standby)
+  // Note: This will always mutate internal state.  Use IsCurrentActive/Standby to
+  // get info without changing state
+  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
+
+  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
+  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
+
+ private:
+  // If HA should be enabled, according to our options and runtime info like # nodes provided
+  bool enabled_;
+  // If we were able to resolve at least 1 HA namenode
+  bool resolved_;
+
+  // Keep service in case a second round of DNS lookup is required
+  ::asio::io_service *ioservice_;
+
+  // Event handlers, for now this is the simplest place to catch all failover events
+  // and push info out to client application.  Possibly move into RPCEngine.
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+  // Only support 1 active and 1 standby for now.
+  ResolvedNamenodeInfo active_info_;
+  ResolvedNamenodeInfo standby_info_;
+
+  // Aquire when switching from active-standby
+  std::mutex swap_lock_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

+ 190 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc

@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "request.h"
+#include "rpc_engine.h"
+#include "sasl_protocol.h"
+
+#include "RpcHeader.pb.h"
+#include "ProtobufRpcEngine.pb.h"
+#include "IpcConnectionContext.pb.h"
+
+#include <sstream>
+
+namespace hdfs {
+
+namespace pb = ::google::protobuf;
+namespace pbio = ::google::protobuf::io;
+
+using namespace ::hadoop::common;
+using namespace ::std::placeholders;
+
+static const int kNoRetry = -1;
+
+// Protobuf helper functions.
+static void AddHeadersToPacket(std::string *res,
+                               std::initializer_list<const pb::MessageLite *> headers,
+                               const std::string *payload) {
+  int len = 0;
+  std::for_each(
+      headers.begin(), headers.end(),
+      [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
+
+  if (payload) {
+    len += payload->size();
+  }
+
+  int net_len = htonl(len);
+  res->reserve(res->size() + sizeof(net_len) + len);
+
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
+
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+
+  std::for_each(
+      headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
+        buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
+        buf = v->SerializeWithCachedSizesToArray(buf);
+      });
+
+  if (payload) {
+    buf = os.WriteStringToArray(*payload, buf);
+  }
+}
+
+static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
+  int len = DelimitedPBMessageSize(header);
+  res->reserve(len);
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
+  buf = header->SerializeWithCachedSizesToArray(buf);
+}
+
+static void ConstructPayload(std::string *res, const std::string *request) {
+  int len =
+      pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
+  res->reserve(len);
+  pbio::StringOutputStream ss(res);
+  pbio::CodedOutputStream os(&ss);
+  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
+  assert(buf);
+  buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
+  buf = os.WriteStringToArray(*request, buf);
+}
+
+static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
+                             const std::string &method_name, int retry_count,
+                             RpcRequestHeaderProto *rpc_header,
+                             RequestHeaderProto *req_header) {
+  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
+  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
+  rpc_header->set_callid(call_id);
+  if (retry_count != kNoRetry)
+    rpc_header->set_retrycount(retry_count);
+  rpc_header->set_clientid(engine->client_id());
+
+  req_header->set_methodname(method_name);
+  req_header->set_declaringclassprotocolname(engine->protocol_name());
+  req_header->set_clientprotocolversion(engine->protocol_version());
+}
+
+// Request implementation
+
+Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
+                 const std::string &request, Handler &&handler)
+    : engine_(engine),
+      method_name_(method_name),
+      call_id_(call_id),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+  ConstructPayload(&payload_, &request);
+}
+
+
+Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
+                 const pb::MessageLite *request, Handler &&handler)
+    : engine_(engine),
+      method_name_(method_name),
+      call_id_(call_id),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+  ConstructPayload(&payload_, request);
+}
+
+Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
+    : engine_(engine),
+      call_id_(-1),
+      timer_(engine->io_service()),
+      handler_(std::move(handler)),
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
+}
+
+void Request::GetPacket(std::string *res) const {
+  LOG_TRACE(kRPC, << "Request::GetPacket called");
+
+  if (payload_.empty())
+    return;
+
+  RpcRequestHeaderProto rpc_header;
+  RequestHeaderProto req_header;
+  SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
+                   &req_header);
+
+  // SASL messages don't have a request header
+  if (method_name_ != SASL_METHOD_NAME)
+    AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
+  else
+    AddHeadersToPacket(res, {&rpc_header}, &payload_);
+}
+
+void Request::OnResponseArrived(pbio::CodedInputStream *is,
+                                const Status &status) {
+  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
+  handler_(is, status);
+}
+
+std::string Request::GetDebugString() const {
+  // Basic description of this object, aimed at debugging
+  std::stringstream ss;
+  ss << "\nRequest Object:\n";
+  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
+  ss << "\tCall id        = " << call_id_ << "\n";
+  ss << "\tRetry Count    = " << retry_count_ << "\n";
+  ss << "\tFailover count = " << failover_count_ << "\n";
+  return ss.str();
+}
+
+int Request::IncrementFailoverCount() {
+  // reset retry count when failing over
+  retry_count_ = 0;
+  return failover_count_++;
+}
+
+} // end namespace hdfs

+ 87 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.h

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_REQUEST_H
+#define LIB_RPC_RPC_REQUEST_H
+
+#include "hdfspp/status.h"
+#include "common/util.h"
+#include "common/new_delete.h"
+
+#include <string>
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <asio/deadline_timer.hpp>
+
+
+namespace hdfs {
+
+class LockFreeRpcEngine;
+class SaslProtocol;
+
+/*
+ * Internal bookkeeping for an outstanding request from the consumer.
+ *
+ * Threading model: not thread-safe; should only be accessed from a single
+ *   thread at a time
+ */
+class Request {
+ public:
+  MEMCHECKED_CLASS(Request)
+  typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
+                             const Status &status)> Handler;
+
+  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
+          const std::string &request, Handler &&callback);
+  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
+          const ::google::protobuf::MessageLite *request, Handler &&callback);
+
+  // Null request (with no actual message) used to track the state of an
+  //    initial Connect call
+  Request(LockFreeRpcEngine *engine, Handler &&handler);
+
+  int call_id() const { return call_id_; }
+  std::string  method_name() const { return method_name_; }
+  ::asio::deadline_timer &timer() { return timer_; }
+  int IncrementRetryCount() { return retry_count_++; }
+  int IncrementFailoverCount();
+  void GetPacket(std::string *res) const;
+  void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
+                         const Status &status);
+
+  int get_failover_count() {return failover_count_;}
+
+  std::string GetDebugString() const;
+
+ private:
+  LockFreeRpcEngine *const engine_;
+  const std::string method_name_;
+  const int call_id_;
+
+  ::asio::deadline_timer timer_;
+  std::string payload_;
+  const Handler handler_;
+
+  int retry_count_;
+  int failover_count_;
+};
+
+} // end namespace hdfs
+#endif // end include guard

+ 144 - 408
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h

@@ -15,430 +15,166 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIB_RPC_RPC_CONNECTION_H_
-#define LIB_RPC_RPC_CONNECTION_H_
+#ifndef LIB_RPC_RPC_CONNECTION_H
+#define LIB_RPC_RPC_CONNECTION_H
 
-#include "rpc_engine.h"
+/*
+ * Encapsulates a persistent connection to the NameNode, and the sending of
+ * RPC requests and evaluating their responses.
+ *
+ * Can have multiple RPC requests in-flight simultaneously, but they are
+ * evaluated in-order on the server side in a blocking manner.
+ *
+ * Threading model: public interface is thread-safe
+ * All handlers passed in to method calls will be called from an asio thread,
+ *   and will not be holding any internal RpcConnection locks.
+ */
 
+#include "request.h"
 #include "common/auth_info.h"
-#include "common/logging.h"
-#include "common/util.h"
 #include "common/libhdfs_events_impl.h"
-#include "sasl_protocol.h"
-
-#include <asio/connect.hpp>
-#include <asio/read.hpp>
-#include <asio/write.hpp>
+#include "common/new_delete.h"
+#include "hdfspp/status.h"
 
-#include <system_error>
+#include <functional>
+#include <memory>
+#include <vector>
+#include <deque>
+#include <unordered_map>
 
 namespace hdfs {
 
-template <class Socket>
-class RpcConnectionImpl : public RpcConnection {
-public:
-  MEMCHECKED_CLASS(RpcConnectionImpl);
+typedef const std::function<void(const Status &)> RpcCallback;
 
-  RpcConnectionImpl(RpcEngine *engine);
-  virtual ~RpcConnectionImpl() override;
+class LockFreeRpcEngine;
+class SaslProtocol;
 
+class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
+ public:
+  MEMCHECKED_CLASS(RpcConnection)
+  RpcConnection(LockFreeRpcEngine *engine);
+  virtual ~RpcConnection();
+
+  // Note that a single server can have multiple endpoints - especially both
+  //   an ipv4 and ipv6 endpoint
   virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                        const AuthInfo & auth_info,
-                       RpcCallback &handler);
-  virtual void ConnectAndFlush(
-      const std::vector<::asio::ip::tcp::endpoint> &server) override;
-  virtual void SendHandshake(RpcCallback &handler) override;
-  virtual void SendContext(RpcCallback &handler) override;
-  virtual void Disconnect() override;
-  virtual void OnSendCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void OnRecvCompleted(const ::asio::error_code &ec,
-                               size_t transferred) override;
-  virtual void FlushPendingRequests() override;
+                       RpcCallback &handler) = 0;
+  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0;
+  virtual void Disconnect() = 0;
+
+  void StartReading();
+  void AsyncRpc(const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
 
+  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
 
-  Socket &TEST_get_mutable_socket() { return socket_; }
+  // Enqueue requests before the connection is connected.  Will be flushed
+  //   on connect
+  void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
 
-  void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
+  // Put requests at the front of the current request queue
+  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
 
- private:
-  const Options options_;
-  ::asio::ip::tcp::endpoint current_endpoint_;
-  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
-  Socket socket_;
-  ::asio::deadline_timer connect_timer_;
+  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
+  void SetClusterName(std::string cluster_name);
 
-  void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
+  LockFreeRpcEngine *engine() { return engine_; }
+  ::asio::io_service &io_service();
+
+ protected:
+  struct Response {
+    enum ResponseState {
+      kReadLength,
+      kReadContent,
+      kParseResponse,
+    } state_;
+    unsigned length_;
+    std::vector<char> data_;
+
+    std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
+    std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
+
+    Response() : state_(kReadLength), length_(0) {}
+  };
+
+
+  // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected
+  virtual void SendHandshake(RpcCallback &handler) = 0;
+  void HandshakeComplete(const Status &s);
+  void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
+  void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
+  virtual void SendContext(RpcCallback &handler) = 0;
+  void ContextComplete(const Status &s);
+
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) = 0;
+  virtual void FlushPendingRequests()=0;      // Synchronously write the next request
+
+  void AsyncRpc_locked(
+                const std::string &method_name,
+                const ::google::protobuf::MessageLite *req,
+                std::shared_ptr<::google::protobuf::MessageLite> resp,
+                const RpcCallback &handler);
+  void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests);
+  void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time
+
+
+
+  std::shared_ptr<std::string> PrepareHandshakePacket();
+  std::shared_ptr<std::string> PrepareContextPacket();
+  static std::string SerializeRpcRequest(const std::string &method_name,
+                                         const ::google::protobuf::MessageLite *req);
+
+  Status HandleRpcResponse(std::shared_ptr<Response> response);
+  void HandleRpcTimeout(std::shared_ptr<Request> req,
+                        const ::asio::error_code &ec);
+  void CommsError(const Status &status);
+
+  void ClearAndDisconnect(const ::asio::error_code &ec);
+  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
+
+  LockFreeRpcEngine *const engine_;
+  std::shared_ptr<Response> current_response_state_;
+  AuthInfo auth_info_;
+
+  // Connection can have deferred connection, especially when we're pausing
+  //   during retry
+  enum ConnectedState {
+      kNotYetConnected,
+      kConnecting,
+      kHandshaking,
+      kAuthenticating,
+      kConnected,
+      kDisconnected
+  };
+  static std::string ToString(ConnectedState connected);
+  ConnectedState connected_;
+
+  // State machine for performing a SASL handshake
+  std::shared_ptr<SaslProtocol> sasl_protocol_;
+  // The request being sent over the wire; will also be in requests_on_fly_
+  std::shared_ptr<Request> request_over_the_wire_;
+  // Requests to be sent over the wire
+  std::deque<std::shared_ptr<Request>> pending_requests_;
+  // Requests to be sent over the wire during authentication; not retried if
+  //   there is a connection error
+  std::deque<std::shared_ptr<Request>> auth_requests_;
+  // Requests that are waiting for responses
+  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
+  RequestOnFlyMap requests_on_fly_;
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+  std::string cluster_name_;
+
+  // Lock for mutable parts of this class that need to be thread safe
+  std::mutex connection_state_lock_;
+
+  friend class SaslProtocol;
 };
 
-template <class Socket>
-RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
-    : RpcConnection(engine),
-      options_(engine->options()),
-      socket_(engine->io_service()),
-      connect_timer_(engine->io_service())
-{
-      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
-}
-
-template <class Socket>
-RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
-  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
-
-  if (pending_requests_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
-  if (requests_on_fly_.size() > 0)
-    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Connect(
-    const std::vector<::asio::ip::tcp::endpoint> &server,
-    const AuthInfo & auth_info,
-    RpcCallback &handler) {
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
-
-  this->auth_info_ = auth_info;
-
-  auto connectionSuccessfulReq = std::make_shared<Request>(
-      engine_, [handler](::google::protobuf::io::CodedInputStream *is,
-                         const Status &status) {
-        (void)is;
-        handler(status);
-      });
-  pending_requests_.push_back(connectionSuccessfulReq);
-  this->ConnectAndFlush(server);  // need "this" so compiler can infer type of CAF
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectAndFlush(
-    const std::vector<::asio::ip::tcp::endpoint> &server) {
-
-  LOG_INFO(kRPC, << "ConnectAndFlush called");
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  if (server.empty()) {
-    Status s = Status::InvalidArgument("No endpoints provided");
-    CommsError(s);
-    return;
-  }
-
-  if (connected_ == kConnected) {
-    FlushPendingRequests();
-    return;
-  }
-  if (connected_ != kNotYetConnected) {
-    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_));
-    return;
-  }
-  connected_ = kConnecting;
-
-  // Take the first endpoint, but remember the alternatives for later
-  additional_endpoints_ = server;
-  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
-  additional_endpoints_.erase(additional_endpoints_.begin());
-  current_endpoint_ = first_endpoint;
-
-  auto shared_this = shared_from_this();
-  socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
-    ConnectComplete(ec, first_endpoint);
-  });
-
-  // Prompt the timer to timeout
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  connect_timer_.expires_from_now(
-        std::chrono::milliseconds(options_.rpc_connect_timeout));
-  connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) {
-      if (ec)
-        ConnectComplete(ec, first_endpoint);
-      else
-        ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
-  });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
-  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  connect_timer_.cancel();
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
-
-  // Could be an old async connect returning a result after we've moved on
-  if (remote != current_endpoint_) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_);
-      return;
-  }
-  if (connected_ != kConnecting) {
-      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
-      return;
-  }
-
-  Status status = ToStatus(ec);
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response() == event_response::kTest_Error) {
-      status = event_resp.status();
-    }
-#endif
-  }
-
-  if (status.ok()) {
-    StartReading();
-    SendHandshake([shared_this, this](const Status & s) {
-      HandshakeComplete(s);
-    });
-  } else {
-    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
-    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
-    if(!err.empty()) {
-      LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
-    }
-
-    if (!additional_endpoints_.empty()) {
-      // If we have additional endpoints, keep trying until we either run out or
-      //    hit one
-      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
-      additional_endpoints_.erase(additional_endpoints_.begin());
-      current_endpoint_ = next_endpoint;
-
-      socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
-        ConnectComplete(ec, next_endpoint);
-      });
-      connect_timer_.expires_from_now(
-            std::chrono::milliseconds(options_.rpc_connect_timeout));
-      connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) {
-          if (ec)
-            ConnectComplete(ec, next_endpoint);
-          else
-            ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
-        });
-    } else {
-      CommsError(status);
-    }
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
-  connected_ = kHandshaking;
-
-  auto shared_this = shared_from_this();
-  auto handshake_packet = PrepareHandshakePacket();
-  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
-                      [handshake_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
-
-  auto shared_this = shared_from_this();
-  auto context_packet = PrepareContextPacket();
-  ::asio::async_write(socket_, asio::buffer(*context_packet),
-                      [context_packet, handler, shared_this, this](
-                          const ::asio::error_code &ec, size_t) {
-                        Status status = ToStatus(ec);
-                        handler(status);
-                      });
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
-
-  request_over_the_wire_.reset();
-  if (ec) {
-    LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
-    CommsError(ToStatus(ec));
-    return;
-  }
-
-  FlushPendingRequests();
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::FlushPendingRequests() {
-  using namespace ::std::placeholders;
-
-  // Lock should be held
-  assert(lock_held(connection_state_lock_));
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
-
-  // Don't send if we don't need to
-  if (request_over_the_wire_) {
-    return;
-  }
-
-  std::shared_ptr<Request> req;
-  switch (connected_) {
-  case kNotYetConnected:
-    return;
-  case kConnecting:
-    return;
-  case kHandshaking:
-    return;
-  case kAuthenticating:
-    if (auth_requests_.empty()) {
-      return;
-    }
-    req = auth_requests_.front();
-    auth_requests_.erase(auth_requests_.begin());
-    break;
-  case kConnected:
-    if (pending_requests_.empty()) {
-      return;
-    }
-    req = pending_requests_.front();
-    pending_requests_.erase(pending_requests_.begin());
-    break;
-  case kDisconnected:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection");
-    return;
-  default:
-    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_));
-    return;
-  }
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
-  auto weak_req = std::weak_ptr<Request>(req);
-
-  std::shared_ptr<std::string> payload = std::make_shared<std::string>();
-  req->GetPacket(payload.get());
-  if (!payload->empty()) {
-    assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
-    requests_on_fly_[req->call_id()] = req;
-    request_over_the_wire_ = req;
-
-    req->timer().expires_from_now(
-        std::chrono::milliseconds(options_.rpc_timeout));
-    req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) {
-        auto timeout_this = weak_this.lock();
-        auto timeout_req = weak_req.lock();
-        if (timeout_this && timeout_req)
-          this->HandleRpcTimeout(timeout_req, ec);
-    });
-
-    asio::async_write(socket_, asio::buffer(*payload),
-                      [shared_this, this, payload](const ::asio::error_code &ec,
-                                                   size_t size) {
-                        OnSendCompleted(ec, size);
-                      });
-  } else {  // Nothing to send for this request, inform the handler immediately
-    io_service().post(
-        // Never hold locks when calling a callback
-        [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
-    );
-
-    // Reschedule to flush the next one
-    AsyncFlushPendingRequests();
-  }
-}
-
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
-                                                   size_t) {
-  using std::placeholders::_1;
-  using std::placeholders::_2;
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-
-  ::asio::error_code my_ec(original_ec);
-
-  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
-
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-
-  if(event_handlers_) {
-    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
-#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
-    if (event_resp.response() == event_response::kTest_Error) {
-      my_ec = std::make_error_code(std::errc::network_down);
-    }
-#endif
-  }
-
-  switch (my_ec.value()) {
-    case 0:
-      // No errors
-      break;
-    case asio::error::operation_aborted:
-      // The event loop has been shut down. Ignore the error.
-      return;
-    default:
-      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
-      CommsError(ToStatus(my_ec));
-      return;
-  }
-
-  if (!current_response_state_) { /* start a new one */
-    current_response_state_ = std::make_shared<Response>();
-  }
-
-  if (current_response_state_->state_ == Response::kReadLength) {
-    current_response_state_->state_ = Response::kReadContent;
-    auto buf = ::asio::buffer(reinterpret_cast<char *>(&current_response_state_->length_),
-                              sizeof(current_response_state_->length_));
-    asio::async_read(
-        socket_, buf,
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kReadContent) {
-    current_response_state_->state_ = Response::kParseResponse;
-    current_response_state_->length_ = ntohl(current_response_state_->length_);
-    current_response_state_->data_.resize(current_response_state_->length_);
-    asio::async_read(
-        socket_, ::asio::buffer(current_response_state_->data_),
-        [shared_this, this](const ::asio::error_code &ec, size_t size) {
-          OnRecvCompleted(ec, size);
-        });
-  } else if (current_response_state_->state_ == Response::kParseResponse) {
-    // Check return status from the RPC response.  We may have received a msg
-    // indicating a server side error.
-
-    Status stat = HandleRpcResponse(current_response_state_);
-
-    if(stat.get_server_exception_type() == Status::kStandbyException) {
-      // May need to bail out, connect to new NN, and restart loop
-      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect");
-    }
-
-    current_response_state_ = nullptr;
-    StartReading();
-  }
-}
-
-template <class Socket>
-void RpcConnectionImpl<Socket>::Disconnect() {
-  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
-
-  LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
-
-  request_over_the_wire_.reset();
-  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
-    // Don't print out errors, we were expecting a disconnect here
-    SafeDisconnect(get_asio_socket_ptr(&socket_));
-  }
-  connected_ = kDisconnected;
-}
-}
-
-#endif
+} // end namespace hdfs
+#endif // end include Guard

+ 8 - 125
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc → hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc

@@ -16,17 +16,13 @@
  * limitations under the License.
  */
 #include "rpc_engine.h"
+#include "rpc_connection_impl.h"
 #include "sasl_protocol.h"
 
 #include "RpcHeader.pb.h"
 #include "ProtobufRpcEngine.pb.h"
 #include "IpcConnectionContext.pb.h"
 
-#include "common/logging.h"
-#include "common/util.h"
-
-#include <asio/read.hpp>
-
 namespace hdfs {
 
 namespace pb = ::google::protobuf;
@@ -70,121 +66,8 @@ static void AddHeadersToPacket(
   }
 }
 
-static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
-  int len = DelimitedPBMessageSize(header);
-  res->reserve(len);
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-  buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
-  buf = header->SerializeWithCachedSizesToArray(buf);
-}
-
-static void ConstructPayload(std::string *res, const std::string *request) {
-  int len =
-      pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
-  res->reserve(len);
-  pbio::StringOutputStream ss(res);
-  pbio::CodedOutputStream os(&ss);
-  uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
-  assert(buf);
-  buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
-  buf = os.WriteStringToArray(*request, buf);
-}
-
-static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
-                             const std::string &method_name, int retry_count,
-                             RpcRequestHeaderProto *rpc_header,
-                             RequestHeaderProto *req_header) {
-  rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
-  rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
-  rpc_header->set_callid(call_id);
-  if (retry_count != kNoRetry)
-    rpc_header->set_retrycount(retry_count);
-  rpc_header->set_clientid(engine->client_id());
-
-  req_header->set_methodname(method_name);
-  req_header->set_declaringclassprotocolname(engine->protocol_name());
-  req_header->set_clientprotocolversion(engine->protocol_version());
-}
-
 RpcConnection::~RpcConnection() {}
 
-Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-                 const std::string &request, Handler &&handler)
-    : engine_(engine),
-      method_name_(method_name),
-      call_id_(call_id),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-  ConstructPayload(&payload_, &request);
-}
-
-Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-                 const pb::MessageLite *request, Handler &&handler)
-    : engine_(engine),
-      method_name_(method_name),
-      call_id_(call_id),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-  ConstructPayload(&payload_, request);
-}
-
-Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
-    : engine_(engine),
-      call_id_(-1),
-      timer_(engine->io_service()),
-      handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
-      failover_count_(0) {
-}
-
-void Request::GetPacket(std::string *res) const {
-  LOG_TRACE(kRPC, << "Request::GetPacket called");
-
-  if (payload_.empty())
-    return;
-
-  RpcRequestHeaderProto rpc_header;
-  RequestHeaderProto req_header;
-  SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
-                   &req_header);
-
-  // SASL messages don't have a request header
-  if (method_name_ != SASL_METHOD_NAME)
-    AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
-  else
-    AddHeadersToPacket(res, {&rpc_header}, &payload_);
-}
-
-void Request::OnResponseArrived(pbio::CodedInputStream *is,
-                                const Status &status) {
-  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
-  handler_(is, status);
-}
-
-std::string Request::GetDebugString() const {
-  // Basic description of this object, aimed at debugging
-  std::stringstream ss;
-  ss << "\nRequest Object:\n";
-  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
-  ss << "\tCall id        = " << call_id_ << "\n";
-  ss << "\tRetry Count    = " << retry_count_ << "\n";
-  ss << "\tFailover count = " << failover_count_ << "\n";
-  return ss.str();
-}
-
-int Request::IncrementFailoverCount() {
-  // reset retry count when failing over
-  retry_count_ = 0;
-  return failover_count_++;
-}
-
 RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
     : engine_(engine),
       connected_(kNotYetConnected) {}
@@ -551,13 +434,13 @@ std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
 std::string RpcConnection::ToString(ConnectedState connected) {
   switch(connected) {
     case kNotYetConnected: return "NotYetConnected";
-    case kConnecting: return "Connecting";
-    case kHandshaking: return "Handshaking";
-    case kAuthenticating: return "Authenticating";
-    case kConnected: return "Connected";
-    case kDisconnected: return "Disconnected";
-    default: return "Invalid ConnectedState";
+    case kConnecting:      return "Connecting";
+    case kHandshaking:     return "Handshaking";
+    case kAuthenticating:  return "Authenticating";
+    case kConnected:       return "Connected";
+    case kDisconnected:    return "Disconnected";
+    default:               return "Invalid ConnectedState";
   }
 }
 
-}
+}// end namespace hdfs

+ 445 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h

@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_
+#define LIB_RPC_RPC_CONNECTION_IMPL_H_
+
+#include "rpc_connection.h"
+#include "rpc_engine.h"
+#include "request.h"
+
+#include "common/auth_info.h"
+#include "common/logging.h"
+#include "common/util.h"
+#include "common/libhdfs_events_impl.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+
+#include <system_error>
+
+namespace hdfs {
+
+template <class Socket>
+class RpcConnectionImpl : public RpcConnection {
+public:
+  MEMCHECKED_CLASS(RpcConnectionImpl);
+
+  RpcConnectionImpl(RpcEngine *engine);
+  virtual ~RpcConnectionImpl() override;
+
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+                       const AuthInfo & auth_info,
+                       RpcCallback &handler);
+  virtual void ConnectAndFlush(
+      const std::vector<::asio::ip::tcp::endpoint> &server) override;
+  virtual void SendHandshake(RpcCallback &handler) override;
+  virtual void SendContext(RpcCallback &handler) override;
+  virtual void Disconnect() override;
+  virtual void OnSendCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void OnRecvCompleted(const ::asio::error_code &ec,
+                               size_t transferred) override;
+  virtual void FlushPendingRequests() override;
+
+
+  Socket &TEST_get_mutable_socket() { return socket_; }
+
+  void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
+
+ private:
+  const Options options_;
+  ::asio::ip::tcp::endpoint current_endpoint_;
+  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
+  Socket socket_;
+  ::asio::deadline_timer connect_timer_;
+
+  void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
+};
+
+template <class Socket>
+RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
+    : RpcConnection(engine),
+      options_(engine->options()),
+      socket_(engine->io_service()),
+      connect_timer_(engine->io_service())
+{
+      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
+}
+
+template <class Socket>
+RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
+  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
+
+  if (pending_requests_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
+  if (requests_on_fly_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::Connect(
+    const std::vector<::asio::ip::tcp::endpoint> &server,
+    const AuthInfo & auth_info,
+    RpcCallback &handler) {
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
+
+  this->auth_info_ = auth_info;
+
+  auto connectionSuccessfulReq = std::make_shared<Request>(
+      engine_, [handler](::google::protobuf::io::CodedInputStream *is,
+                         const Status &status) {
+        (void)is;
+        handler(status);
+      });
+  pending_requests_.push_back(connectionSuccessfulReq);
+  this->ConnectAndFlush(server);  // need "this" so compiler can infer type of CAF
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectAndFlush(
+    const std::vector<::asio::ip::tcp::endpoint> &server) {
+
+  LOG_INFO(kRPC, << "ConnectAndFlush called");
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  if (server.empty()) {
+    Status s = Status::InvalidArgument("No endpoints provided");
+    CommsError(s);
+    return;
+  }
+
+  if (connected_ == kConnected) {
+    FlushPendingRequests();
+    return;
+  }
+  if (connected_ != kNotYetConnected) {
+    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_));
+    return;
+  }
+  connected_ = kConnecting;
+
+  // Take the first endpoint, but remember the alternatives for later
+  additional_endpoints_ = server;
+  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
+  additional_endpoints_.erase(additional_endpoints_.begin());
+  current_endpoint_ = first_endpoint;
+
+  auto shared_this = shared_from_this();
+  socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
+    ConnectComplete(ec, first_endpoint);
+  });
+
+  // Prompt the timer to timeout
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
+  connect_timer_.expires_from_now(
+        std::chrono::milliseconds(options_.rpc_connect_timeout));
+  connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) {
+      if (ec)
+        ConnectComplete(ec, first_endpoint);
+      else
+        ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
+  });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
+  auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  connect_timer_.cancel();
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
+
+  // Could be an old async connect returning a result after we've moved on
+  if (remote != current_endpoint_) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_);
+      return;
+  }
+  if (connected_ != kConnecting) {
+      LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
+      return;
+  }
+
+  Status status = ToStatus(ec);
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
+  if (status.ok()) {
+    StartReading();
+    SendHandshake([shared_this, this](const Status & s) {
+      HandshakeComplete(s);
+    });
+  } else {
+    LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
+    std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
+    if(!err.empty()) {
+      LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
+    }
+
+    if (!additional_endpoints_.empty()) {
+      // If we have additional endpoints, keep trying until we either run out or
+      //    hit one
+      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
+      additional_endpoints_.erase(additional_endpoints_.begin());
+      current_endpoint_ = next_endpoint;
+
+      socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
+        ConnectComplete(ec, next_endpoint);
+      });
+      connect_timer_.expires_from_now(
+            std::chrono::milliseconds(options_.rpc_connect_timeout));
+      connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) {
+          if (ec)
+            ConnectComplete(ec, next_endpoint);
+          else
+            ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
+        });
+    } else {
+      CommsError(status);
+    }
+  }
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
+  connected_ = kHandshaking;
+
+  auto shared_this = shared_from_this();
+  auto handshake_packet = PrepareHandshakePacket();
+  ::asio::async_write(socket_, asio::buffer(*handshake_packet),
+                      [handshake_packet, handler, shared_this, this](
+                          const ::asio::error_code &ec, size_t) {
+                        Status status = ToStatus(ec);
+                        handler(status);
+                      });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
+
+  auto shared_this = shared_from_this();
+  auto context_packet = PrepareContextPacket();
+  ::asio::async_write(socket_, asio::buffer(*context_packet),
+                      [context_packet, handler, shared_this, this](
+                          const ::asio::error_code &ec, size_t) {
+                        Status status = ToStatus(ec);
+                        handler(status);
+                      });
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
+
+  request_over_the_wire_.reset();
+  if (ec) {
+    LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
+    CommsError(ToStatus(ec));
+    return;
+  }
+
+  FlushPendingRequests();
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::FlushPendingRequests() {
+  using namespace ::std::placeholders;
+
+  // Lock should be held
+  assert(lock_held(connection_state_lock_));
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
+
+  // Don't send if we don't need to
+  if (request_over_the_wire_) {
+    return;
+  }
+
+  std::shared_ptr<Request> req;
+  switch (connected_) {
+  case kNotYetConnected:
+    return;
+  case kConnecting:
+    return;
+  case kHandshaking:
+    return;
+  case kAuthenticating:
+    if (auth_requests_.empty()) {
+      return;
+    }
+    req = auth_requests_.front();
+    auth_requests_.erase(auth_requests_.begin());
+    break;
+  case kConnected:
+    if (pending_requests_.empty()) {
+      return;
+    }
+    req = pending_requests_.front();
+    pending_requests_.erase(pending_requests_.begin());
+    break;
+  case kDisconnected:
+    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection");
+    return;
+  default:
+    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_));
+    return;
+  }
+
+  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
+  auto weak_req = std::weak_ptr<Request>(req);
+
+  std::shared_ptr<std::string> payload = std::make_shared<std::string>();
+  req->GetPacket(payload.get());
+  if (!payload->empty()) {
+    assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
+    requests_on_fly_[req->call_id()] = req;
+    request_over_the_wire_ = req;
+
+    req->timer().expires_from_now(
+        std::chrono::milliseconds(options_.rpc_timeout));
+    req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) {
+        auto timeout_this = weak_this.lock();
+        auto timeout_req = weak_req.lock();
+        if (timeout_this && timeout_req)
+          this->HandleRpcTimeout(timeout_req, ec);
+    });
+
+    asio::async_write(socket_, asio::buffer(*payload),
+                      [shared_this, this, payload](const ::asio::error_code &ec,
+                                                   size_t size) {
+                        OnSendCompleted(ec, size);
+                      });
+  } else {  // Nothing to send for this request, inform the handler immediately
+    io_service().post(
+        // Never hold locks when calling a callback
+        [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
+    );
+
+    // Reschedule to flush the next one
+    AsyncFlushPendingRequests();
+  }
+}
+
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
+                                                   size_t) {
+  using std::placeholders::_1;
+  using std::placeholders::_2;
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  ::asio::error_code my_ec(original_ec);
+
+  LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
+
+  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      my_ec = std::make_error_code(std::errc::network_down);
+    }
+#endif
+  }
+
+  switch (my_ec.value()) {
+    case 0:
+      // No errors
+      break;
+    case asio::error::operation_aborted:
+      // The event loop has been shut down. Ignore the error.
+      return;
+    default:
+      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
+      CommsError(ToStatus(my_ec));
+      return;
+  }
+
+  if (!current_response_state_) { /* start a new one */
+    current_response_state_ = std::make_shared<Response>();
+  }
+
+  if (current_response_state_->state_ == Response::kReadLength) {
+    current_response_state_->state_ = Response::kReadContent;
+    auto buf = ::asio::buffer(reinterpret_cast<char *>(&current_response_state_->length_),
+                              sizeof(current_response_state_->length_));
+    asio::async_read(
+        socket_, buf,
+        [shared_this, this](const ::asio::error_code &ec, size_t size) {
+          OnRecvCompleted(ec, size);
+        });
+  } else if (current_response_state_->state_ == Response::kReadContent) {
+    current_response_state_->state_ = Response::kParseResponse;
+    current_response_state_->length_ = ntohl(current_response_state_->length_);
+    current_response_state_->data_.resize(current_response_state_->length_);
+    asio::async_read(
+        socket_, ::asio::buffer(current_response_state_->data_),
+        [shared_this, this](const ::asio::error_code &ec, size_t size) {
+          OnRecvCompleted(ec, size);
+        });
+  } else if (current_response_state_->state_ == Response::kParseResponse) {
+    // Check return status from the RPC response.  We may have received a msg
+    // indicating a server side error.
+
+    Status stat = HandleRpcResponse(current_response_state_);
+
+    if(stat.get_server_exception_type() == Status::kStandbyException) {
+      // May need to bail out, connect to new NN, and restart loop
+      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect");
+    }
+
+    current_response_state_ = nullptr;
+    StartReading();
+  }
+}
+
+template <class Socket>
+void RpcConnectionImpl<Socket>::Disconnect() {
+  assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
+
+  LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
+
+  request_over_the_wire_.reset();
+  if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
+    // Don't print out errors, we were expecting a disconnect here
+    SafeDisconnect(get_asio_socket_ptr(&socket_));
+  }
+  connected_ = kDisconnected;
+}
+}
+
+#endif

+ 1 - 124
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -16,13 +16,12 @@
  * limitations under the License.
  */
 #include "rpc_engine.h"
-#include "rpc_connection.h"
+#include "rpc_connection_impl.h"
 #include "common/util.h"
 #include "common/logging.h"
 #include "common/namenode_info.h"
 #include "optional.hpp"
 
-#include <future>
 #include <algorithm>
 
 namespace hdfs {
@@ -30,114 +29,6 @@ namespace hdfs {
 template <class T>
 using optional = std::experimental::optional<T>;
 
-HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
-                                     ::asio::io_service *ioservice,
-                                     std::shared_ptr<LibhdfsEvents> event_handlers)
-                  : enabled_(false), resolved_(false),
-                    ioservice_(ioservice), event_handlers_(event_handlers)
-{
-  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
-  for(unsigned int i=0;i<servers.size();i++)
-    LOG_TRACE(kRPC, << servers[i].str());
-
-  if(servers.size() >= 2) {
-    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
-    if(servers.size() > 2) {
-      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't be used.");
-    }
-
-    active_info_ = servers[0];
-    standby_info_ = servers[1];
-    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
-    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
-
-    enabled_ = true;
-    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
-      resolved_ = true;
-    }
-  }
-}
-
-
-HANamenodeTracker::~HANamenodeTracker() { }
-
-
-static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) {
-  std::stringstream ss;
-  for(unsigned int i=0; i<pts.size(); i++)
-    if(i == pts.size() - 1)
-      ss << pts[i];
-    else
-      ss << pts[i] << ", ";
-  return ss.str();
-}
-
-//  Pass in endpoint from current connection, this will do a reverse lookup
-//  and return the info for the standby node. It will also swap its state internally.
-ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
-  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
-  mutex_guard swap_lock(swap_lock_);
-
-  ResolvedNamenodeInfo failover_node;
-
-  // Connected to standby, switch standby to active
-  if(IsCurrentActive_locked(current_endpoint)) {
-    std::swap(active_info_, standby_info_);
-    if(event_handlers_)
-      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
-                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
-  } else if(IsCurrentStandby_locked(current_endpoint)) {
-    // Connected to standby
-    if(event_handlers_)
-      event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
-                            reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
-  } else {
-    // Invalid state, throw for testing
-    std::string ep1 = format_endpoints(active_info_.endpoints);
-    std::string ep2 = format_endpoints(standby_info_.endpoints);
-
-    std::stringstream msg;
-    msg << "Looked for " << current_endpoint << " in\n";
-    msg << ep1 << " and\n";
-    msg << ep2 << std::endl;
-
-    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
-    throw std::runtime_error(msg.str());
-  }
-
-  if(failover_node.endpoints.empty()) {
-    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
-    if(!ResolveInPlace(ioservice_, failover_node)) {
-      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
-                      << "failed.  Please make sure your configuration is up to date.");
-    }
-  }
-  return failover_node;
-}
-
-bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
-  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
-    if(ep.address() == active_info_.endpoints[i].address()) {
-      if(ep.port() != active_info_.endpoints[i].port())
-        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway..");
-      return true;
-    }
-  }
-  return false;
-}
-
-bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const {
-  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
-    if(ep.address() == standby_info_.endpoints[i].address()) {
-      if(ep.port() != standby_info_.endpoints[i].port())
-        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway..");
-      return true;
-    }
-  }
-  return false;
-}
 
 RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
                      const std::string &client_name, const std::string &user_name,
@@ -276,19 +167,6 @@ void RpcEngine::AsyncRpc(
   conn_->AsyncRpc(method_name, req, resp, handler);
 }
 
-Status RpcEngine::Rpc(
-    const std::string &method_name, const ::google::protobuf::MessageLite *req,
-    const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
-
-  LOG_TRACE(kRPC, << "RpcEngine::Rpc called");
-
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  AsyncRpc(method_name, req, resp,
-           [stat](const Status &status) { stat->set_value(status); });
-  return future.get();
-}
-
 std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
 {
   LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called");
@@ -304,7 +182,6 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
   return result;
 }
 
-
 void RpcEngine::AsyncRpcCommsError(
     const Status &status,
     std::shared_ptr<RpcConnection> failedConnection,

+ 4 - 246
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h

@@ -25,25 +25,19 @@
 #include "common/retry_policy.h"
 #include "common/libhdfs_events_impl.h"
 #include "common/util.h"
-#include "common/continuation/asio.h"
-#include "common/logging.h"
 #include "common/new_delete.h"
 #include "common/namenode_info.h"
+#include "namenode_tracker.h"
 
 #include <google/protobuf/message_lite.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
 
 #include <asio/ip/tcp.hpp>
 #include <asio/deadline_timer.hpp>
 
 #include <atomic>
 #include <memory>
-#include <unordered_map>
 #include <vector>
-#include <deque>
 #include <mutex>
-#include <future>
 
 namespace hdfs {
 
@@ -64,193 +58,8 @@ typedef const std::function<void(const Status &)> RpcCallback;
 class LockFreeRpcEngine;
 class RpcConnection;
 class SaslProtocol;
-
-/*
- * Internal bookkeeping for an outstanding request from the consumer.
- *
- * Threading model: not thread-safe; should only be accessed from a single
- *   thread at a time
- */
-class Request {
- public:
-  MEMCHECKED_CLASS(Request)
-  typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
-                             const Status &status)> Handler;
-
-  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-          const std::string &request, Handler &&callback);
-  Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
-          const ::google::protobuf::MessageLite *request, Handler &&callback);
-
-  // Null request (with no actual message) used to track the state of an
-  //    initial Connect call
-  Request(LockFreeRpcEngine *engine, Handler &&handler);
-
-  int call_id() const { return call_id_; }
-  std::string  method_name() const { return method_name_; }
-  ::asio::deadline_timer &timer() { return timer_; }
-  int IncrementRetryCount() { return retry_count_++; }
-  int IncrementFailoverCount();
-  void GetPacket(std::string *res) const;
-  void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
-                         const Status &status);
-
-  int get_failover_count() {return failover_count_;}
-
-  std::string GetDebugString() const;
-
- private:
-  LockFreeRpcEngine *const engine_;
-  const std::string method_name_;
-  const int call_id_;
-
-  ::asio::deadline_timer timer_;
-  std::string payload_;
-  const Handler handler_;
-
-  int retry_count_;
-  int failover_count_;
-};
-
-/*
- * Encapsulates a persistent connection to the NameNode, and the sending of
- * RPC requests and evaluating their responses.
- *
- * Can have multiple RPC requests in-flight simultaneously, but they are
- * evaluated in-order on the server side in a blocking manner.
- *
- * Threading model: public interface is thread-safe
- * All handlers passed in to method calls will be called from an asio thread,
- *   and will not be holding any internal RpcConnection locks.
- */
-class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
- public:
-  MEMCHECKED_CLASS(RpcConnection)
-  RpcConnection(LockFreeRpcEngine *engine);
-  virtual ~RpcConnection();
-
-  // Note that a single server can have multiple endpoints - especially both
-  //   an ipv4 and ipv6 endpoint
-  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
-                       const AuthInfo & auth_info,
-                       RpcCallback &handler) = 0;
-  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0;
-  virtual void Disconnect() = 0;
-
-  void StartReading();
-  void AsyncRpc(const std::string &method_name,
-                const ::google::protobuf::MessageLite *req,
-                std::shared_ptr<::google::protobuf::MessageLite> resp,
-                const RpcCallback &handler);
-
-  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
-
-  // Enqueue requests before the connection is connected.  Will be flushed
-  //   on connect
-  void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
-
-  // Put requests at the front of the current request queue
-  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
-
-  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
-  void SetClusterName(std::string cluster_name);
-
-  LockFreeRpcEngine *engine() { return engine_; }
-  ::asio::io_service &io_service();
-
- protected:
-  struct Response {
-    enum ResponseState {
-      kReadLength,
-      kReadContent,
-      kParseResponse,
-    } state_;
-    unsigned length_;
-    std::vector<char> data_;
-
-    std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
-    std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
-
-    Response() : state_(kReadLength), length_(0) {}
-  };
-
-
-  // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected
-  virtual void SendHandshake(RpcCallback &handler) = 0;
-  void HandshakeComplete(const Status &s);
-  void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
-  void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
-  virtual void SendContext(RpcCallback &handler) = 0;
-  void ContextComplete(const Status &s);
-
-
-  virtual void OnSendCompleted(const ::asio::error_code &ec,
-                               size_t transferred) = 0;
-  virtual void OnRecvCompleted(const ::asio::error_code &ec,
-                               size_t transferred) = 0;
-  virtual void FlushPendingRequests()=0;      // Synchronously write the next request
-
-  void AsyncRpc_locked(
-                const std::string &method_name,
-                const ::google::protobuf::MessageLite *req,
-                std::shared_ptr<::google::protobuf::MessageLite> resp,
-                const RpcCallback &handler);
-  void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests);
-  void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time
-
-
-
-  std::shared_ptr<std::string> PrepareHandshakePacket();
-  std::shared_ptr<std::string> PrepareContextPacket();
-  static std::string SerializeRpcRequest(
-      const std::string &method_name,
-      const ::google::protobuf::MessageLite *req);
-  Status HandleRpcResponse(std::shared_ptr<Response> response);
-  void HandleRpcTimeout(std::shared_ptr<Request> req,
-                        const ::asio::error_code &ec);
-  void CommsError(const Status &status);
-
-  void ClearAndDisconnect(const ::asio::error_code &ec);
-  std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
-
-  LockFreeRpcEngine *const engine_;
-  std::shared_ptr<Response> current_response_state_;
-  AuthInfo auth_info_;
-
-  // Connection can have deferred connection, especially when we're pausing
-  //   during retry
-  enum ConnectedState {
-      kNotYetConnected,
-      kConnecting,
-      kHandshaking,
-      kAuthenticating,
-      kConnected,
-      kDisconnected
-  };
-  static std::string ToString(ConnectedState connected);
-  ConnectedState connected_;
-
-  // State machine for performing a SASL handshake
-  std::shared_ptr<SaslProtocol> sasl_protocol_;
-  // The request being sent over the wire; will also be in requests_on_fly_
-  std::shared_ptr<Request> request_over_the_wire_;
-  // Requests to be sent over the wire
-  std::deque<std::shared_ptr<Request>> pending_requests_;
-  // Requests to be sent over the wire during authentication; not retried if
-  //   there is a connection error
-  std::deque<std::shared_ptr<Request>> auth_requests_;
-  // Requests that are waiting for responses
-  typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
-  RequestOnFlyMap requests_on_fly_;
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-  std::string cluster_name_;
-
-  // Lock for mutable parts of this class that need to be thread safe
-  std::mutex connection_state_lock_;
-
-  friend class SaslProtocol;
-};
-
+class RpcConnection;
+class Request;
 
 /*
  * These methods of the RpcEngine will never acquire locks, and are safe for
@@ -259,6 +68,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
 class LockFreeRpcEngine {
 public:
   MEMCHECKED_CLASS(LockFreeRpcEngine)
+
   /* Enqueues a CommsError without acquiring a lock*/
   virtual void AsyncRpcCommsError(const Status &status,
                       std::shared_ptr<RpcConnection> failedConnection,
@@ -278,54 +88,6 @@ public:
 };
 
 
-/*
- *  Tracker gives the RpcEngine a quick way to use an endpoint that just
- *  failed in order to lookup a set of endpoints for a failover node.
- *
- *  Note: For now this only deals with 2 NameNodes, but that's the default
- *  anyway.
- */
-class HANamenodeTracker {
- public:
-  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
-                    ::asio::io_service *ioservice,
-                    std::shared_ptr<LibhdfsEvents> event_handlers_);
-
-  virtual ~HANamenodeTracker();
-
-  bool is_enabled() const { return enabled_; }
-  bool is_resolved() const { return resolved_; }
-
-  // Get node opposite of the current one if possible (swaps active/standby)
-  // Note: This will always mutate internal state.  Use IsCurrentActive/Standby to
-  // get info without changing state
-  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
-
-  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
-  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
-
- private:
-  // If HA should be enabled, according to our options and runtime info like # nodes provided
-  bool enabled_;
-  // If we were able to resolve at least 1 HA namenode
-  bool resolved_;
-
-  // Keep service in case a second round of DNS lookup is required
-  ::asio::io_service *ioservice_;
-
-  // Event handlers, for now this is the simplest place to catch all failover events
-  // and push info out to client application.  Possibly move into RPCEngine.
-  std::shared_ptr<LibhdfsEvents> event_handlers_;
-
-  // Only support 1 active and 1 standby for now.
-  ResolvedNamenodeInfo active_info_;
-  ResolvedNamenodeInfo standby_info_;
-
-  // Aquire when switching from active-standby
-  std::mutex swap_lock_;
-};
-
-
 /*
  * An engine for reliable communication with a NameNode.  Handles connection,
  * retry, and (someday) failover of the requested messages.
@@ -360,10 +122,6 @@ class RpcEngine : public LockFreeRpcEngine {
                 const std::shared_ptr<::google::protobuf::MessageLite> &resp,
                 const std::function<void(const Status &)> &handler);
 
-  Status Rpc(const std::string &method_name,
-             const ::google::protobuf::MessageLite *req,
-             const std::shared_ptr<::google::protobuf::MessageLite> &resp);
-
   void Shutdown();
 
   /* Enqueues a CommsError without acquiring a lock*/

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc

@@ -17,6 +17,7 @@
  */
 
 #include "rpc_engine.h"
+#include "rpc_connection.h"
 #include "common/logging.h"
 
 #include "sasl_engine.h"

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc

@@ -19,7 +19,7 @@
 #include "mock_connection.h"
 #include "test.pb.h"
 #include "RpcHeader.pb.h"
-#include "rpc/rpc_connection.h"
+#include "rpc/rpc_connection_impl.h"
 #include "common/namenode_info.h"
 
 #include <google/protobuf/io/coded_stream.h>