Procházet zdrojové kódy

HDFS-10450: libhdfs++: Add Cyrus SASL support. Contributed by Don Davis.

Bob Hansen před 8 roky
rodič
revize
549a5dbce9

+ 0 - 1
dev-support/docker/Dockerfile

@@ -201,4 +201,3 @@ ENV FORREST_HOME=/opt/apache-forrest
 ADD hadoop_env_checks.sh /root/hadoop_env_checks.sh
 RUN chmod 755 /root/hadoop_env_checks.sh
 RUN echo '~/hadoop_env_checks.sh' >> /root/.bashrc
-

+ 40 - 27
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt

@@ -23,13 +23,15 @@ cmake_minimum_required(VERSION 2.8)
 enable_testing()
 include (CTest)
 SET(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
-SET(CMAKE_DISABLE_FIND_PACKAGE_CyrusSASL TRUE) # Until development is done.
+
+# If there's a better way to inform FindCyrusSASL.cmake, let's make this cleaner:
+SET(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${CYRUS_SASL_DIR};${GSASL_DIR}")
 
 find_package(Doxygen)
 find_package(OpenSSL REQUIRED)
 find_package(Protobuf REQUIRED)
-find_package(GSasl)
 find_package(CyrusSASL)
+find_package(GSasl)
 find_package(Threads)
 
 find_program(MEMORYCHECK_COMMAND valgrind HINTS ${VALGRIND_DIR} )
@@ -41,36 +43,47 @@ if (REQUIRE_VALGRIND AND MEMORYCHECK_COMMAND MATCHES "MEMORYCHECK_COMMAND-NOTFOU
                       "The path can be included via a -DVALGRIND_DIR=... flag passed to CMake.")
 endif (REQUIRE_VALGRIND AND MEMORYCHECK_COMMAND MATCHES "MEMORYCHECK_COMMAND-NOTFOUND" )
 
+# Find the SASL library to use.  If you don't want to require a sasl library,
+#    define -DNO_SASL=1 in your cmake call
 # Prefer Cyrus SASL, but use GSASL if it is found
 # Note that the packages can be disabled by setting CMAKE_DISABLE_FIND_PACKAGE_GSasl or
 #    CMAKE_DISABLE_FIND_PACKAGE_CyrusSASL, respectively (case sensitive)
 set (SASL_LIBRARIES)
 set (SASL_INCLUDE_DIR)
-if (CYRUS_SASL_FOUND)
-    message(STATUS "Using Cyrus SASL; link with ${CYRUS_SASL_LIBRARIES}")
-    set (SASL_INCLUDE_DIR ${CYRUS_SASL_INCLUDE_DIR})
-    set (SASL_LIBRARIES ${CYRUS_SASL_SHARED_LIB})
-    add_definitions(-DUSE_SASL -DUSE_CYRUS_SASL)
-else (CYRUS_SASL_FOUND)
-    if (REQUIRE_CYRUS_SASL)
-      message(FATAL_ERROR "Cyrus SASL was required but not found.  "
-                            "The path can be included via a -DCYRUS_SASL_DIR=... flag passed to CMake.")
-    endif (REQUIRE_CYRUS_SASL)
-
-    # If we didn't pick Cyrus, use GSASL instead
-    if (GSASL_FOUND)
-      message(STATUS "Using GSASL; link with ${GSASL_LIBRARIES}")
-      set (SASL_INCLUDE_DIR ${GSASL_INCLUDE_DIR})
-      set (SASL_LIBRARIES ${GSASL_LIBRARIES})
-      add_definitions(-DUSE_SASL -DUSE_GSASL)
-    else (GSASL_FOUND)
-      if (REQUIRE_GSASL)
-        message(FATAL_ERROR "GSASL was required but not found.  "
-                            "The path can be included via a -DGSASL_DIR=... flag passed to CMake.")
-      endif (REQUIRE_GSASL)
-      message(STATUS "Not using SASL")
-    endif (GSASL_FOUND)
-endif (CYRUS_SASL_FOUND)
+if (NOT NO_SASL)
+    if (CYRUS_SASL_FOUND)
+        message(STATUS "Using Cyrus SASL; link with ${CYRUS_SASL_SHARED_LIB}")
+        set (SASL_INCLUDE_DIR ${CYRUS_SASL_INCLUDE_DIR})
+        set (SASL_LIBRARIES ${CYRUS_SASL_SHARED_LIB})
+        set (CMAKE_USING_CYRUS_SASL 1)
+        add_definitions(-DUSE_SASL -DUSE_CYRUS_SASL)
+    else (CYRUS_SASL_FOUND)
+        if (REQUIRE_CYRUS_SASL)
+          message(FATAL_ERROR "Cyrus SASL was required but not found.  "
+                                "The path can be included via a -DCYRUS_SASL_DIR=... flag passed to CMake.")
+        endif (REQUIRE_CYRUS_SASL)
+
+        # If we didn't pick Cyrus, use GSASL instead
+        if (GSASL_FOUND)
+          message(STATUS "Using GSASL; link with ${GSASL_LIBRARIES}")
+          set (SASL_INCLUDE_DIR ${GSASL_INCLUDE_DIR})
+          set (SASL_LIBRARIES ${GSASL_LIBRARIES})
+          set (CMAKE_USING_GSASL 1)
+          add_definitions(-DUSE_SASL -DUSE_GSASL)
+        else (GSASL_FOUND)
+          if (REQUIRE_GSASL)
+            message(FATAL_ERROR "GSASL was required but not found.  "
+                                "The path can be included via a -DGSASL_DIR=... flag passed to CMake.")
+          endif (REQUIRE_GSASL)
+
+          # No SASL was found, but NO_SASL was not defined
+          message(FATAL_ERROR "Cound not find a SASL library (GSASL (gsasl) or Cyrus SASL (libsasl2).  "
+                            "Install/configure one of them or define NO_SASL=1 in your cmake call")
+        endif (GSASL_FOUND)
+    endif (CYRUS_SASL_FOUND)
+else (NOT NO_SASL)
+    message(STATUS "Compiling with NO SASL SUPPORT")
+endif (NOT NO_SASL)
 
 add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME)
 

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt

@@ -16,7 +16,16 @@
 # limitations under the License.
 #
 
-include_directories(${OPENSSL_INCLUDE_DIRS})
-add_library(rpc_obj OBJECT rpc_connection.cc rpc_engine.cc sasl_protocol.cc sasl_engine.cc)
+list(APPEND rpc_object_items rpc_connection.cc rpc_engine.cc sasl_protocol.cc sasl_engine.cc)
+if (CMAKE_USING_CYRUS_SASL)
+  list(APPEND rpc_object_items cyrus_sasl_engine.cc)
+endif (CMAKE_USING_CYRUS_SASL)
+if (CMAKE_USING_GSASL)
+ list(APPEND rpc_object_items gsasl_engine.cc)
+endif (CMAKE_USING_GSASL)
+
+add_library(rpc_obj OBJECT ${rpc_object_items})
+
+
 add_dependencies(rpc_obj proto)
 add_library(rpc $<TARGET_OBJECTS:rpc_obj>)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc

@@ -214,7 +214,7 @@ void RpcConnection::HandshakeComplete(const Status &s) {
 #ifdef USE_SASL
         sasl_protocol_ = std::make_shared<SaslProtocol>(cluster_name_, auth_info_, shared_from_this());
         sasl_protocol_->SetEventHandlers(event_handlers_);
-        sasl_protocol_->authenticate([shared_this, this](
+        sasl_protocol_->Authenticate([shared_this, this](
                           const Status & status, const AuthInfo & new_auth_info) {
                         AuthComplete(status, new_auth_info); } );
 #else

+ 27 - 132
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.cc

@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-#include "sasl_engine.h"
-
-#include "common/logging.h"
-
 #include <sstream>
+#include <string.h> // memcpy()
 
+#include "sasl_engine.h"
+#include "common/logging.h"
 
 namespace hdfs {
 
 /*****************************************************************************
- *                     BASE CLASS
+ *                    SASL ENGINE BASE CLASS
  */
 
-SaslEngine::State SaslEngine::getState()
+SaslEngine::State SaslEngine::GetState()
 {
     return state_;
 }
@@ -37,149 +36,45 @@ SaslEngine::State SaslEngine::getState()
 SaslEngine::~SaslEngine() {
 }
 
-Status SaslEngine::setKerberosInfo(const std::string &principal)
+Status SaslEngine::SetKerberosInfo(const std::string &principal)
 {
   principal_ = principal;
   return Status::OK();
 }
 
-Status SaslEngine::setPasswordInfo(const std::string &id,
-                       const std::string &password)
+Status SaslEngine::SetPasswordInfo(const std::string &id,
+                                   const std::string &password)
 {
   id_ = id;
   password_ = password;
   return Status::OK();
 }
 
+bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) {
+  Status status = Status::OK();
 
-#ifdef USE_GSASL
-/*****************************************************************************
- *                     GSASL
- */
-
-#include <gsasl.h>
+  if (resp_auths.empty()) return NULL;
 
+  for (SaslMethod auth: resp_auths) {
+     if ( auth.mechanism != "GSSAPI") continue; // Hack: only GSSAPI for now
 
-/*****************************************************************************
- *                     UTILITY FUNCTIONS
- */
+     // do a proper deep copy of the vector element
+     // that we like, because the original v ector will go away:
+     chosen_mech_.mechanism = auth.mechanism;
+     chosen_mech_.protocol  = auth.protocol;
+     chosen_mech_.serverid  = auth.serverid;
+     chosen_mech_.challenge = auth.challenge;
 
-Status gsasl_rc_to_status(int rc)
-{
-  if (rc == GSASL_OK) {
-    return Status::OK();
-  } else {
-    std::ostringstream ss;
-    ss << "Cannot initialize client (" << rc << "): " << gsasl_strerror(rc);
-    return Status::Error(ss.str().c_str());
+     return auth.mechanism.c_str();
   }
-}
 
+  state_ = kErrorState;
+  status = Status::Error("SaslEngine::chooseMech(): No good protocol.");
 
-GSaslEngine::~GSaslEngine()
-{
-  if (session_ != nullptr) {
-      gsasl_finish(session_);
-  }
+  // Clear out the chosen mech
+  chosen_mech_ = SaslMethod();
 
-  if (ctx_ != nullptr) {
-      gsasl_done(ctx_);
-  }
-}
+  return NULL;
+} // choose_mech()
 
-std::pair<Status, SaslMethod> GSaslEngine::start(const std::vector<SaslMethod> &protocols)
-{
-  int rc = gsasl_init(&ctx_);
-  if (rc != GSASL_OK) {
-    state_ = kError;
-    return std::make_pair(gsasl_rc_to_status(rc), SaslMethod());
-  }
-
-  // Hack to only do GSSAPI at the moment
-  for (auto protocol: protocols) {
-    if (protocol.mechanism == "GSSAPI") {
-      Status init = init_kerberos(protocol);
-      if (init.ok()) {
-        state_ = kWaitingForData;
-        return std::make_pair(init, protocol);
-      } else {
-        state_ = kError;
-        return std::make_pair(init, SaslMethod());
-      }
-    }
-  }
-
-  state_ = kError;
-  return std::make_pair(Status::Error("No good protocol"), SaslMethod());
-}
-
-Status GSaslEngine::init_kerberos(const SaslMethod & mechanism) {
-  /* Create new authentication session. */
-  int rc = gsasl_client_start(ctx_, mechanism.mechanism.c_str(), &session_);
-  if (rc != GSASL_OK) {
-    return gsasl_rc_to_status(rc);
-  }
-
-  if (!principal_) {
-    return Status::Error("Attempted kerberos authentication with no principal");
-  }
-
-  gsasl_property_set(session_, GSASL_SERVICE, mechanism.protocol.c_str());
-  gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
-  gsasl_property_set(session_, GSASL_HOSTNAME, mechanism.serverid.c_str());
-  return Status::OK();
-  }
-
-  std::pair<Status, std::string> GSaslEngine::step(const std::string data)
-  {
-    if (state_ != kWaitingForData)
-      LOG_WARN(kRPC, << "GSaslEngine::step when state is " << state_);
-
-    char * output = NULL;
-    size_t outputSize;
-    int rc = gsasl_step(session_, data.c_str(), data.size(), &output,
-                        &outputSize);
-
-    if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
-      std::string retval(output, output ? outputSize : 0);
-      if (output) {
-        free(output);
-      }
-
-      if (rc == GSASL_OK) {
-        state_ = kSuccess;
-      }
-
-      return std::make_pair(Status::OK(), retval);
-    }
-    else {
-      if (output) {
-        free(output);
-      }
-      state_ = kFailure;
-      return std::make_pair(gsasl_rc_to_status(rc), "");
-    }
-  }
-
-Status GSaslEngine::finish()
-{
-  if (state_ != kSuccess && state_ != kFailure && state_ != kError )
-    LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_);
-
-  if (session_ != nullptr) {
-      gsasl_finish(session_);
-      session_ = NULL;
-  }
-
-  if (ctx_ != nullptr) {
-      gsasl_done(ctx_);
-      ctx_ = nullptr;
-  }
-
-  return Status::OK();
-}
-#endif // USE_GSASL
-
-
-
-}
+} // namespace hdfs

+ 29 - 50
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_engine.h

@@ -22,14 +22,12 @@
 #include "hdfspp/status.h"
 #include "optional.hpp"
 
-#ifdef USE_GSASL
-#include "gsasl.h"
-#endif
-
 #include <vector>
 
 namespace hdfs {
 
+class SaslProtocol;
+
 template <class T>
 using optional = std::experimental::optional<T>;
 
@@ -38,7 +36,7 @@ public:
   std::string protocol;
   std::string mechanism;
   std::string serverid;
-  void *      data;
+  std::string challenge;
 };
 
 class SaslEngine {
@@ -48,7 +46,7 @@ public:
         kWaitingForData,
         kSuccess,
         kFailure,
-        kError,
+        kErrorState,
     };
 
     // State transitions:
@@ -56,70 +54,51 @@ public:
     // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v
     //                                               \-> kFailure -/
 
-    SaslEngine() : state_ (kUnstarted) {}
+    // State transitions:
+    //                    \--------------------------/
+    // kUnstarted --start--> kWaitingForData --step-+--> kSuccess --finish--v
+    //                                               \-> kFailure -/
+
+    SaslEngine(): state_ (kUnstarted) {}
     virtual ~SaslEngine();
 
     // Must be called when state is kUnstarted
-    Status setKerberosInfo(const std::string &principal);
+    Status SetKerberosInfo(const std::string &principal);
     // Must be called when state is kUnstarted
-    Status setPasswordInfo(const std::string &id,
+    Status SetPasswordInfo(const std::string &id,
                            const std::string &password);
 
+    // Choose a mechanism from the available ones.  Will set the
+    //    chosen_mech_ member and return true if we found one we
+    //    can process
+    bool ChooseMech(const std::vector<SaslMethod> &avail_auths);
+
     // Returns the current state
-    State getState();
+    State GetState();
 
     // Must be called when state is kUnstarted
-    virtual std::pair<Status,SaslMethod>  start(
-              const std::vector<SaslMethod> &protocols) = 0;
+    virtual std::pair<Status,std::string>  Start() = 0;
 
     // Must be called when state is kWaitingForData
     // Returns kOK and any data that should be sent to the server
-    virtual std::pair<Status,std::string> step(const std::string data) = 0;
+    virtual std::pair<Status,std::string> Step(const std::string data) = 0;
 
-    // Must only be called when state is kSuccess, kFailure, or kError
-    virtual Status finish() = 0;
+    // Must only be called when state is kSuccess, kFailure, or kErrorState
+    virtual Status Finish() = 0;
+
+    // main repository of generic Sasl config data:
+    SaslMethod chosen_mech_;
 protected:
   State state_;
+  SaslProtocol * sasl_protocol_;
 
   optional<std::string> principal_;
+  optional<std::string> realm_;
   optional<std::string> id_;
   optional<std::string> password_;
 
-};
+}; // class SaslEngine
 
-#ifdef USE_GSASL
-class GSaslEngine : public SaslEngine
-{
-public:
-  GSaslEngine() : SaslEngine(), ctx_(nullptr), session_(nullptr) {}
-  virtual ~GSaslEngine();
-
-  virtual std::pair<Status,SaslMethod>  start(
-            const std::vector<SaslMethod> &protocols);
-  virtual std::pair<Status,std::string> step(const std::string data);
-  virtual Status finish();
-private:
-  Gsasl * ctx_;
-  Gsasl_session * session_;
-
-  Status init_kerberos(const SaslMethod & mechanism);
-};
-#endif
-
-#ifdef USE_CYRUS_SASL
-class CyrusSaslEngine : public SaslEngine
-{
-public:
-  GSaslEngine() : SaslEngine(), ctx_(nullptr), session_(nullptr) {}
-  virtual ~GSaslEngine();
-
-  virtual std::pair<Status,SaslMethod>  start(
-            const std::vector<SaslMethod> &protocols);
-  virtual std::pair<Status,std::string> step(const std::string data);
-  virtual Status finish();
-private:
-};
-#endif
+} // namespace hdfs
 
-}
 #endif /* LIB_RPC_SASLENGINE_H */

+ 176 - 90
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc

@@ -16,11 +16,22 @@
  * limitations under the License.
  */
 
-#include "sasl_protocol.h"
-#include "sasl_engine.h"
 #include "rpc_engine.h"
 #include "common/logging.h"
 
+#include "sasl_engine.h"
+#include "sasl_protocol.h"
+
+#if defined USE_SASL
+  #if defined USE_CYRUS_SASL
+    #include "cyrus_sasl_engine.h"  // CySaslEngine()
+  #elif defined USE_GSASL
+    #include      "gsasl_engine.h"  //  GSaslEngine()
+  #else
+    #error USE_SASL defined but no engine (USE_GSASL) defined
+  #endif
+#endif
+
 #include <optional.hpp>
 
 namespace hdfs {
@@ -60,17 +71,13 @@ SaslProtocol::~SaslProtocol()
 void SaslProtocol::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) {
   std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
   event_handlers_ = event_handlers;
-}
+  event_handlers_->call("SASL Start", cluster_name_.c_str(), 0);
+} // SetEventHandlers() method
 
-void SaslProtocol::authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback)
+void SaslProtocol::Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback)
 {
   std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
 
-  LOG_TRACE(kRPC, << "Authenticating as " << auth_info_.getUser());
-
-  assert(state_ == kUnstarted);
-  event_handlers_->call("SASL Start", cluster_name_.c_str(), 0);
-
   callback_ = callback;
   state_ = kNegotiate;
 
@@ -86,8 +93,9 @@ void SaslProtocol::authenticate(std::function<void(const Status & status, const
   std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>();
   auto self(shared_from_this());
   connection->AsyncRpc_locked(SASL_METHOD_NAME, req_msg.get(), resp_msg,
-                       [self, req_msg, resp_msg] (const Status & status) { self->OnServerResponse(status, resp_msg.get()); } );
-}
+                       [self, req_msg, resp_msg] (const Status & status) {
+            self->OnServerResponse(status, resp_msg.get()); } );
+} // authenticate() method
 
 AuthInfo::AuthMethod ParseMethod(const std::string & method)
   {
@@ -103,113 +111,195 @@ AuthInfo::AuthMethod ParseMethod(const std::string & method)
     else {
       return AuthInfo::kUnknownAuth;
     }
-}
+} // ParseMethod()
 
-void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response)
+// build_init_msg():
+// Helper function for Start(), to keep
+// these ProtoBuf-RPC calls out of the sasl_engine code.
+
+std::pair<Status, RpcSaslProto>
+SaslProtocol::BuildInitMessage(std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg)
 {
-  std::vector<SaslMethod> protocols;
+  // The init message needs one of the RpcSaslProto_SaslAuth structures that
+  //    was sent in the negotiate message as our chosen mechanism
+  // Map the chosen_mech name back to the RpcSaslProto_SaslAuth from the negotiate
+  //    message that it corresponds to
+  SaslMethod & chosenMech = sasl_engine_->chosen_mech_;
+  auto auths = negotiate_msg->auths();
+  auto pb_auth_it = std::find_if(auths.begin(), auths.end(),
+                                 [chosenMech](const RpcSaslProto_SaslAuth & data)
+                                 {
+                                   return data.mechanism() == chosenMech.mechanism;
+                                 });
+  if (pb_auth_it == auths.end())
+    return std::make_pair(Status::Error("Couldn't find mechanism in negotiate msg"), RpcSaslProto());
+  auto & pb_auth = *pb_auth_it;
+
+  // Prepare INITIATE message
+  RpcSaslProto initiate = RpcSaslProto();
+
+  initiate.set_state(RpcSaslProto_SaslState_INITIATE);
+  // initiate message will contain:
+  //   token_ (binary data), and
+  //   auths_[ ], an array of objects just like pb_auth.
+  // In our case, we want the auths array
+  // to hold just the single element from pb_auth:
+
+  RpcSaslProto_SaslAuth * respAuth = initiate.add_auths();
+  respAuth->CopyFrom(pb_auth);
+
+  // Mostly, an INITIATE message contains a "Token".
+  // For GSSAPI, the token is a Kerberos AP_REQ, aka
+  // "Authenticated application request," comprising
+  // the client's application ticket & and an encrypted
+  // message that Kerberos calls an "authenticator".
+
+  if (token.empty()) {
+    const char * errmsg = "SaslProtocol::build_init_msg():  No token available.";
+    LOG_ERROR(kRPC, << errmsg);
+    return std::make_pair(Status::Error(errmsg), RpcSaslProto());
+  }
 
-  bool simple_available = false;
+  // add challenge token to the INITIATE message:
+  initiate.set_token(token);
 
-#if defined USE_SASL
-  #if defined USE_CYRUS_SASL
-    sasl_engine_.reset(new CyrusSaslEngine());
-  #elif defined USE_GSASL
-    sasl_engine_.reset(new GSaslEngine());
-  #else
-    #error USE_SASL defined but no engine (USE_GSASL) defined
-  #endif
-#endif
-  if (auth_info_.getToken()) {
-    sasl_engine_->setPasswordInfo(auth_info_.getToken().value().identifier,
-                                  auth_info_.getToken().value().password);
-  }
-  sasl_engine_->setKerberosInfo(auth_info_.getUser()); // HDFS-10451 will look up principal by username
+  // the initiate message is ready to send:
+  return std::make_pair(Status::OK(), initiate);
+} // build_init_msg()
+
+// Converts the RpcSaslProto.auths ararray from RpcSaslProto_SaslAuth PB
+//    structures to SaslMethod structures
+static bool
+extract_auths(std::vector<SaslMethod>   & resp_auths,
+     const hadoop::common::RpcSaslProto * response) {
 
+  bool simple_avail = false;
+  auto pb_auths = response->auths();
 
-  auto auths = response->auths();
-  for (int i = 0; i < auths.size(); ++i) {
-      auto auth = auths.Get(i);
-      AuthInfo::AuthMethod method = ParseMethod(auth.method());
+  // For our GSSAPI case, an element of pb_auths contains:
+  //    method_      = "KERBEROS"
+  //    mechanism_   = "GSSAPI"
+  //    protocol_    = "nn"      /* "name node", AKA "hdfs"
+  //    serverid_    = "foobar1.acmecorp.com"
+  //    challenge_   = ""
+  //   _cached_size_ = 0
+  //   _has_bits_    = 15
+
+  for (int i = 0; i < pb_auths.size(); ++i) {
+      auto  pb_auth = pb_auths.Get(i);
+      AuthInfo::AuthMethod method = ParseMethod(pb_auth.method());
 
       switch(method) {
       case AuthInfo::kToken:
       case AuthInfo::kKerberos: {
           SaslMethod new_method;
-          new_method.mechanism = auth.mechanism();
-          new_method.protocol = auth.protocol();
-          new_method.serverid = auth.serverid();
-          new_method.data = const_cast<RpcSaslProto_SaslAuth *>(&response->auths().Get(i));
-          protocols.push_back(new_method);
+          new_method.mechanism = pb_auth.mechanism();
+          new_method.protocol  = pb_auth.protocol();
+          new_method.serverid  = pb_auth.serverid();
+          new_method.challenge = pb_auth.has_challenge() ?
+                                 pb_auth.challenge()     : "";
+          resp_auths.push_back(new_method);
         }
         break;
       case AuthInfo::kSimple:
-        simple_available = true;
+        simple_avail = true;
         break;
       case AuthInfo::kUnknownAuth:
-        LOG_WARN(kRPC, << "Unknown auth method " << auth.method() << "; ignoring");
+        LOG_WARN(kRPC, << "Unknown auth method " << pb_auth.method() << "; ignoring");
         break;
       default:
         LOG_WARN(kRPC, << "Invalid auth type:  " << method << "; ignoring");
         break;
       }
-  }
-
-  if (!protocols.empty()) {
-    auto init = sasl_engine_->start(protocols);
-    if (init.first.ok()) {
-      auto chosen_auth = reinterpret_cast<RpcSaslProto_SaslAuth *>(init.second.data);
-
-      // Prepare initiate message
-      RpcSaslProto initiate;
-      initiate.set_state(RpcSaslProto_SaslState_INITIATE);
-      RpcSaslProto_SaslAuth * respAuth = initiate.add_auths();
-      respAuth->CopyFrom(*chosen_auth);
-
-      LOG_TRACE(kRPC, << "Using auth: " << chosen_auth->protocol() << "/" <<
-              chosen_auth->mechanism() << "/" << chosen_auth->serverid());
+  } // for
+  return simple_avail;
+} // extract_auths()
 
-      std::string challenge = chosen_auth->has_challenge() ? chosen_auth->challenge() : "";
-      auto sasl_challenge = sasl_engine_->step(challenge);
+void SaslProtocol::ResetEngine() {
+#if defined USE_SASL
+  #if defined USE_CYRUS_SASL
+    sasl_engine_.reset(new CySaslEngine());
+  #elif defined USE_GSASL
+    sasl_engine_.reset(new GSaslEngine());
+  #else
+    #error USE_SASL defined but no engine (USE_GSASL) defined
+  #endif
+#endif
+    return;
+} // Reset_Engine() method
 
-      if (sasl_challenge.first.ok()) {
-        if (!sasl_challenge.second.empty()) {
-          initiate.set_token(sasl_challenge.second);
-        }
+void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response)
+{
+  this->ResetEngine(); // get a new SaslEngine
 
-        std::shared_ptr<RpcSaslProto> return_msg = std::make_shared<RpcSaslProto>();
-        SendSaslMessage(initiate);
-        return;
-      } else {
-        AuthComplete(sasl_challenge.first, auth_info_);
-        return;
-      }
-    } else if (!simple_available) {
-      // If simple IS available, fall through to below
-      AuthComplete(init.first, auth_info_);
+  if (auth_info_.getToken()) {
+    sasl_engine_->SetPasswordInfo(auth_info_.getToken().value().identifier,
+                                  auth_info_.getToken().value().password);
+  }
+  sasl_engine_->SetKerberosInfo(auth_info_.getUser()); // TODO: map to principal?
+
+  // Copy the response's auths list to an array of SaslMethod objects.
+  // SaslEngine shouldn't need to know about the protobuf classes.
+  std::vector<SaslMethod> resp_auths;
+  bool simple_available = extract_auths(resp_auths,   response);
+  bool mech_chosen = sasl_engine_->ChooseMech(resp_auths);
+
+  if (mech_chosen) {
+
+    // Prepare an INITIATE message,
+    // later on we'll send it to the hdfs server:
+    auto     start_result  = sasl_engine_->Start();
+    Status   status        = start_result.first;
+    if (! status.ok()) {
+      // start() failed, simple isn't avail,
+      // so give up & stop authentication:
+      AuthComplete(status, auth_info_);
       return;
     }
+    // token.second is a binary buffer, containing
+    // client credentials that will prove the
+    // client's identity to the application server.
+    // Put the token into an INITIATE message:
+    auto init = BuildInitMessage(start_result.second, response);
+
+    // If all is OK, send the INITIATE msg to the hdfs server;
+    // Otherwise, if possible, fail over to simple authentication:
+    status = init.first;
+    if (status.ok()) {
+      SendSaslMessage(init.second);
+      return;
+    }
+    if (!simple_available) {
+      // build_init_msg() failed, simple isn't avail,
+      // so give up & stop authentication:
+      AuthComplete(status, auth_info_);
+      return;
+    }
+    // If simple IS available, fall through to below,
+    // but without build_init_msg()'s failure-status.
   }
 
-  // There were no protocols, or the SaslEngine couldn't make one work
+  // There were no resp_auths, or the SaslEngine couldn't make one work
   if (simple_available) {
     // Simple was the only one we could use.  That's OK.
     AuthComplete(Status::OK(), auth_info_);
     return;
   } else {
-    // We didn't understand any of the protocols; give back some information
+    // We didn't understand any of the resp_auths;
+    // Give back some information
     std::stringstream ss;
     ss << "Client cannot authenticate via: ";
 
-    for (int i = 0; i < auths.size(); ++i) {
-      auto auth = auths.Get(i);
-      ss << auth.mechanism() << ", ";
+    auto pb_auths = response->auths();
+    for (int i = 0; i < pb_auths.size(); ++i) {
+      const RpcSaslProto_SaslAuth & pb_auth = pb_auths.Get(i);
+      ss << pb_auth.mechanism() << ", ";
     }
 
     AuthComplete(Status::Error(ss.str().c_str()), auth_info_);
     return;
   }
-}
+} // Negotiate() method
 
 void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge)
 {
@@ -223,7 +313,7 @@ void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge)
   response.set_state(RpcSaslProto_SaslState_RESPONSE);
 
   std::string challenge_token = challenge->has_token() ? challenge->token() : "";
-  auto sasl_response = sasl_engine_->step(challenge_token);
+  auto sasl_response = sasl_engine_->Step(challenge_token);
 
   if (sasl_response.first.ok()) {
     response.set_token(sasl_response.second);
@@ -234,7 +324,7 @@ void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge)
     AuthComplete(sasl_response.first, auth_info_);
     return;
   }
-}
+} // Challenge() method
 
 bool SaslProtocol::SendSaslMessage(RpcSaslProto & message)
 {
@@ -256,8 +346,9 @@ bool SaslProtocol::SendSaslMessage(RpcSaslProto & message)
                        } );
 
   return true;
-}
+} // SendSaslMessage() method
 
+// AuthComplete():  stop the auth effort, successful ot not:
 bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & auth_info)
 {
   assert(lock_held(sasl_state_lock_));  // Must be holding lock before calling
@@ -270,15 +361,11 @@ bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & auth_inf
     return false;
   }
 
-  if (!status.ok()) {
-    auth_info_.setMethod(AuthInfo::kAuthFailed);
-  }
-
-  LOG_TRACE(kRPC, << "AuthComplete: " << status.ToString());
+  LOG_TRACE(kRPC, << "Received SASL response" << status.ToString());
   connection->AuthComplete(status, auth_info);
 
   return true;
-}
+} // AuthComplete() method
 
 void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response)
 {
@@ -295,7 +382,7 @@ void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common:
       break;
     case RpcSaslProto_SaslState_SUCCESS:
       if (sasl_engine_) {
-        sasl_engine_->finish();
+        sasl_engine_->Finish();
       }
       AuthComplete(Status::OK(), auth_info_);
       break;
@@ -314,7 +401,6 @@ void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common:
   } else {
     AuthComplete(status, auth_info_);
   }
-}
+} // OnServerResponse() method
 
-
-}
+} // namespace hdfs

+ 13 - 9
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.h

@@ -19,22 +19,23 @@
 #ifndef LIB_RPC_SASLPROTOCOL_H
 #define LIB_RPC_SASLPROTOCOL_H
 
-#include "hdfspp/status.h"
-#include "common/auth_info.h"
-#include "common/libhdfs_events_impl.h"
-
-#include <RpcHeader.pb.h>
-
 #include <memory>
 #include <mutex>
 #include <functional>
 
+#include <RpcHeader.pb.h>
+
+#include "hdfspp/status.h"
+#include "common/auth_info.h"
+#include "common/libhdfs_events_impl.h"
+
 namespace hdfs {
 
 static constexpr const char * SASL_METHOD_NAME = "sasl message";
 
 class RpcConnection;
 class SaslEngine;
+class SaslMethod;
 
 class SaslProtocol : public std::enable_shared_from_this<SaslProtocol>
 {
@@ -48,8 +49,9 @@ public:
 
   // Start the async authentication process.  Must be called while holding the
   //   connection lock, but all callbacks will occur outside of the connection lock
-  void authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback);
+  void Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback);
   void OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response);
+  std::pair<Status, hadoop::common::RpcSaslProto> BuildInitMessage( std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg);
 private:
   enum State {
     kUnstarted,
@@ -72,10 +74,12 @@ private:
   bool SendSaslMessage(hadoop::common::RpcSaslProto & message);
   bool AuthComplete(const Status & status, const AuthInfo & auth_info);
 
+  void ResetEngine();
   void Negotiate(const hadoop::common::RpcSaslProto * response);
   void Challenge(const hadoop::common::RpcSaslProto * response);
-};
 
-}
+}; // class SaslProtocol
+
+} // namespace hdfs
 
 #endif /* LIB_RPC_SASLPROTOCOL_H */