sasl_protocol.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "rpc_engine.h"
  19. #include "rpc_connection.h"
  20. #include "common/logging.h"
  21. #include "common/optional_wrapper.h"
  22. #include "x-platform/syscall.h"
  23. #include "sasl_engine.h"
  24. #include "sasl_protocol.h"
  25. #if defined USE_SASL
  26. #if defined USE_CYRUS_SASL
  27. #include "cyrus_sasl_engine.h" // CySaslEngine()
  28. #elif defined USE_GSASL
  29. #include "gsasl_engine.h" // GSaslEngine()
  30. #else
  31. #error USE_SASL defined but no engine (USE_GSASL) defined
  32. #endif
  33. #endif
  34. namespace hdfs {
  35. using namespace hadoop::common;
  36. using namespace google::protobuf;
  37. /*****
  38. * Threading model: all entry points need to acquire the sasl_lock before accessing
  39. * members of the class
  40. *
  41. * Lifecycle model: asio may have outstanding callbacks into this class for arbitrary
  42. * amounts of time, so any references to the class must be shared_ptr's. The
  43. * SASLProtocol keeps a weak_ptr to the owning RpcConnection, which might go away,
  44. * so the weak_ptr should be locked only long enough to make callbacks into the
  45. * RpcConnection.
  46. */
  47. SaslProtocol::SaslProtocol(const std::string & cluster_name,
  48. const AuthInfo & auth_info,
  49. std::shared_ptr<RpcConnection> connection) :
  50. state_(kUnstarted),
  51. cluster_name_(cluster_name),
  52. auth_info_(auth_info),
  53. connection_(connection)
  54. {
  55. }
  56. SaslProtocol::~SaslProtocol()
  57. {
  58. assert(state_ != kNegotiate);
  59. }
  60. void SaslProtocol::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) {
  61. std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
  62. event_handlers_ = event_handlers;
  63. } // SetEventHandlers() method
  64. void SaslProtocol::Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback)
  65. {
  66. std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
  67. callback_ = callback;
  68. state_ = kNegotiate;
  69. event_handlers_->call("SASL Start", cluster_name_.c_str(), 0);
  70. std::shared_ptr<RpcSaslProto> req_msg = std::make_shared<RpcSaslProto>();
  71. req_msg->set_state(RpcSaslProto_SaslState_NEGOTIATE);
  72. // We cheat here since this is always called while holding the RpcConnection's lock
  73. std::shared_ptr<RpcConnection> connection = connection_.lock();
  74. if (!connection) {
  75. AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo());
  76. return;
  77. }
  78. std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>();
  79. auto self(shared_from_this());
  80. connection->AsyncRpc_locked(SASL_METHOD_NAME, req_msg.get(), resp_msg,
  81. [self, req_msg, resp_msg, connection] (const Status & status) {
  82. assert(connection);
  83. self->OnServerResponse(status, resp_msg.get());
  84. });
  85. } // authenticate() method
  86. AuthInfo::AuthMethod ParseMethod(const std::string & method) {
  87. if (XPlatform::Syscall::StringCompareIgnoreCase(method, "SIMPLE")) {
  88. return AuthInfo::kSimple;
  89. }
  90. if (XPlatform::Syscall::StringCompareIgnoreCase(method, "KERBEROS")) {
  91. return AuthInfo::kKerberos;
  92. }
  93. if (XPlatform::Syscall::StringCompareIgnoreCase(method, "TOKEN")) {
  94. return AuthInfo::kToken;
  95. }
  96. return AuthInfo::kUnknownAuth;
  97. } // ParseMethod()
  98. // build_init_msg():
  99. // Helper function for Start(), to keep
  100. // these ProtoBuf-RPC calls out of the sasl_engine code.
  101. std::pair<Status, RpcSaslProto>
  102. SaslProtocol::BuildInitMessage(std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg)
  103. {
  104. // The init message needs one of the RpcSaslProto_SaslAuth structures that
  105. // was sent in the negotiate message as our chosen mechanism
  106. // Map the chosen_mech name back to the RpcSaslProto_SaslAuth from the negotiate
  107. // message that it corresponds to
  108. SaslMethod & chosenMech = sasl_engine_->chosen_mech_;
  109. auto auths = negotiate_msg->auths();
  110. auto pb_auth_it = std::find_if(auths.begin(), auths.end(),
  111. [chosenMech](const RpcSaslProto_SaslAuth & data)
  112. {
  113. return data.mechanism() == chosenMech.mechanism;
  114. });
  115. if (pb_auth_it == auths.end())
  116. return std::make_pair(Status::Error("Couldn't find mechanism in negotiate msg"), RpcSaslProto());
  117. auto & pb_auth = *pb_auth_it;
  118. // Prepare INITIATE message
  119. RpcSaslProto initiate = RpcSaslProto();
  120. initiate.set_state(RpcSaslProto_SaslState_INITIATE);
  121. // initiate message will contain:
  122. // token_ (binary data), and
  123. // auths_[ ], an array of objects just like pb_auth.
  124. // In our case, we want the auths array
  125. // to hold just the single element from pb_auth:
  126. RpcSaslProto_SaslAuth * respAuth = initiate.add_auths();
  127. respAuth->CopyFrom(pb_auth);
  128. // Mostly, an INITIATE message contains a "Token".
  129. // For GSSAPI, the token is a Kerberos AP_REQ, aka
  130. // "Authenticated application request," comprising
  131. // the client's application ticket & and an encrypted
  132. // message that Kerberos calls an "authenticator".
  133. if (token.empty()) {
  134. const char * errmsg = "SaslProtocol::build_init_msg(): No token available.";
  135. LOG_ERROR(kRPC, << errmsg);
  136. return std::make_pair(Status::Error(errmsg), RpcSaslProto());
  137. }
  138. // add challenge token to the INITIATE message:
  139. initiate.set_token(token);
  140. // the initiate message is ready to send:
  141. return std::make_pair(Status::OK(), initiate);
  142. } // build_init_msg()
  143. // Converts the RpcSaslProto.auths ararray from RpcSaslProto_SaslAuth PB
  144. // structures to SaslMethod structures
  145. static bool
  146. extract_auths(std::vector<SaslMethod> & resp_auths,
  147. const hadoop::common::RpcSaslProto * response) {
  148. bool simple_avail = false;
  149. auto pb_auths = response->auths();
  150. // For our GSSAPI case, an element of pb_auths contains:
  151. // method_ = "KERBEROS"
  152. // mechanism_ = "GSSAPI"
  153. // protocol_ = "nn" /* "name node", AKA "hdfs"
  154. // serverid_ = "foobar1.acmecorp.com"
  155. // challenge_ = ""
  156. // _cached_size_ = 0
  157. // _has_bits_ = 15
  158. for (int i = 0; i < pb_auths.size(); ++i) {
  159. auto pb_auth = pb_auths.Get(i);
  160. AuthInfo::AuthMethod method = ParseMethod(pb_auth.method());
  161. switch(method) {
  162. case AuthInfo::kToken:
  163. case AuthInfo::kKerberos: {
  164. SaslMethod new_method;
  165. new_method.mechanism = pb_auth.mechanism();
  166. new_method.protocol = pb_auth.protocol();
  167. new_method.serverid = pb_auth.serverid();
  168. new_method.challenge = pb_auth.has_challenge() ?
  169. pb_auth.challenge() : "";
  170. resp_auths.push_back(new_method);
  171. }
  172. break;
  173. case AuthInfo::kSimple:
  174. simple_avail = true;
  175. break;
  176. case AuthInfo::kUnknownAuth:
  177. LOG_WARN(kRPC, << "Unknown auth method " << pb_auth.method() << "; ignoring");
  178. break;
  179. default:
  180. LOG_WARN(kRPC, << "Invalid auth type: " << method << "; ignoring");
  181. break;
  182. }
  183. } // for
  184. return simple_avail;
  185. } // extract_auths()
  186. void SaslProtocol::ResetEngine() {
  187. #if defined USE_SASL
  188. #if defined USE_CYRUS_SASL
  189. sasl_engine_.reset(new CySaslEngine());
  190. #elif defined USE_GSASL
  191. sasl_engine_.reset(new GSaslEngine());
  192. #else
  193. #error USE_SASL defined but no engine (USE_GSASL) defined
  194. #endif
  195. #endif
  196. return;
  197. } // Reset_Engine() method
  198. void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response)
  199. {
  200. this->ResetEngine(); // get a new SaslEngine
  201. if (auth_info_.getToken()) {
  202. sasl_engine_->SetPasswordInfo(auth_info_.getToken().value().identifier,
  203. auth_info_.getToken().value().password);
  204. }
  205. sasl_engine_->SetKerberosInfo(auth_info_.getUser()); // TODO: map to principal?
  206. // Copy the response's auths list to an array of SaslMethod objects.
  207. // SaslEngine shouldn't need to know about the protobuf classes.
  208. std::vector<SaslMethod> resp_auths;
  209. bool simple_available = extract_auths(resp_auths, response);
  210. bool mech_chosen = sasl_engine_->ChooseMech(resp_auths);
  211. if (mech_chosen) {
  212. // Prepare an INITIATE message,
  213. // later on we'll send it to the hdfs server:
  214. auto start_result = sasl_engine_->Start();
  215. Status status = start_result.first;
  216. if (! status.ok()) {
  217. // start() failed, simple isn't avail,
  218. // so give up & stop authentication:
  219. AuthComplete(status, auth_info_);
  220. return;
  221. }
  222. // token.second is a binary buffer, containing
  223. // client credentials that will prove the
  224. // client's identity to the application server.
  225. // Put the token into an INITIATE message:
  226. auto init = BuildInitMessage(start_result.second, response);
  227. // If all is OK, send the INITIATE msg to the hdfs server;
  228. // Otherwise, if possible, fail over to simple authentication:
  229. status = init.first;
  230. if (status.ok()) {
  231. SendSaslMessage(init.second);
  232. return;
  233. }
  234. if (!simple_available) {
  235. // build_init_msg() failed, simple isn't avail,
  236. // so give up & stop authentication:
  237. AuthComplete(status, auth_info_);
  238. return;
  239. }
  240. // If simple IS available, fall through to below,
  241. // but without build_init_msg()'s failure-status.
  242. }
  243. // There were no resp_auths, or the SaslEngine couldn't make one work
  244. if (simple_available) {
  245. // Simple was the only one we could use. That's OK.
  246. AuthComplete(Status::OK(), auth_info_);
  247. return;
  248. } else {
  249. // We didn't understand any of the resp_auths;
  250. // Give back some information
  251. std::stringstream ss;
  252. ss << "Client cannot authenticate via: ";
  253. auto pb_auths = response->auths();
  254. for (int i = 0; i < pb_auths.size(); ++i) {
  255. const RpcSaslProto_SaslAuth & pb_auth = pb_auths.Get(i);
  256. ss << pb_auth.mechanism() << ", ";
  257. }
  258. AuthComplete(Status::Error(ss.str().c_str()), auth_info_);
  259. return;
  260. }
  261. } // Negotiate() method
  262. void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge)
  263. {
  264. if (!sasl_engine_) {
  265. AuthComplete(Status::Error("Received challenge before negotiate"), auth_info_);
  266. return;
  267. }
  268. RpcSaslProto response;
  269. response.CopyFrom(*challenge);
  270. response.set_state(RpcSaslProto_SaslState_RESPONSE);
  271. std::string challenge_token = challenge->has_token() ? challenge->token() : "";
  272. auto sasl_response = sasl_engine_->Step(challenge_token);
  273. if (sasl_response.first.ok()) {
  274. response.set_token(sasl_response.second);
  275. std::shared_ptr<RpcSaslProto> return_msg = std::make_shared<RpcSaslProto>();
  276. SendSaslMessage(response);
  277. } else {
  278. AuthComplete(sasl_response.first, auth_info_);
  279. return;
  280. }
  281. } // Challenge() method
  282. bool SaslProtocol::SendSaslMessage(RpcSaslProto & message)
  283. {
  284. assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling
  285. // RpcConnection might have been freed when we weren't looking. Lock it
  286. // to make sure it's there long enough for us
  287. std::shared_ptr<RpcConnection> connection = connection_.lock();
  288. if (!connection) {
  289. LOG_DEBUG(kRPC, << "Tried sending a SASL Message but the RPC connection was gone");
  290. AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo());
  291. return false;
  292. }
  293. std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>();
  294. auto self(shared_from_this());
  295. connection->AsyncRpc(SASL_METHOD_NAME, &message, resp_msg,
  296. [self, resp_msg, connection] (const Status & status) {
  297. assert(connection);
  298. self->OnServerResponse(status, resp_msg.get());
  299. });
  300. return true;
  301. } // SendSaslMessage() method
  302. // AuthComplete(): stop the auth effort, successful ot not:
  303. bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & auth_info)
  304. {
  305. assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling
  306. state_ = kComplete;
  307. event_handlers_->call("SASL End", cluster_name_.c_str(), 0);
  308. // RpcConnection might have been freed when we weren't looking. Lock it
  309. // to make sure it's there long enough for us
  310. std::shared_ptr<RpcConnection> connection = connection_.lock();
  311. if (!connection) {
  312. LOG_DEBUG(kRPC, << "Tried sending an AuthComplete but the RPC connection was gone: " << status.ToString());
  313. return false;
  314. }
  315. LOG_TRACE(kRPC, << "Received SASL response" << status.ToString());
  316. connection->AuthComplete(status, auth_info);
  317. return true;
  318. } // AuthComplete() method
  319. void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response)
  320. {
  321. std::lock_guard<std::mutex> state_lock(sasl_state_lock_);
  322. LOG_TRACE(kRPC, << "Received SASL response: " << status.ToString());
  323. if (status.ok()) {
  324. switch(response->state()) {
  325. case RpcSaslProto_SaslState_NEGOTIATE:
  326. Negotiate(response);
  327. break;
  328. case RpcSaslProto_SaslState_CHALLENGE:
  329. Challenge(response);
  330. break;
  331. case RpcSaslProto_SaslState_SUCCESS:
  332. if (sasl_engine_) {
  333. sasl_engine_->Finish();
  334. }
  335. AuthComplete(Status::OK(), auth_info_);
  336. break;
  337. case RpcSaslProto_SaslState_INITIATE: // Server side only
  338. case RpcSaslProto_SaslState_RESPONSE: // Server side only
  339. case RpcSaslProto_SaslState_WRAP:
  340. LOG_ERROR(kRPC, << "Invalid client-side SASL state: " << response->state());
  341. AuthComplete(Status::Error("Invalid client-side state"), auth_info_);
  342. break;
  343. default:
  344. LOG_ERROR(kRPC, << "Unknown client-side SASL state: " << response->state());
  345. AuthComplete(Status::Error("Unknown client-side state"), auth_info_);
  346. break;
  347. }
  348. } else {
  349. AuthComplete(status, auth_info_);
  350. }
  351. } // OnServerResponse() method
  352. } // namespace hdfs