hdfs_configuration.cc 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "common/hdfs_configuration.h"
  19. #include "common/logging.h"
  20. #include <exception>
  21. #ifndef DEFAULT_SCHEME
  22. #define DEFAULT_SCHEME "hdfs://"
  23. #endif
  24. namespace hdfs {
  25. // Constructs a configuration with no search path and no resources loaded
  26. HdfsConfiguration::HdfsConfiguration() : Configuration() {}
  27. // Constructs a configuration with a copy of the input data
  28. HdfsConfiguration::HdfsConfiguration(ConfigMap &src_map) : Configuration(src_map) {}
  29. HdfsConfiguration::HdfsConfiguration(const ConfigMap &src_map) : Configuration(src_map) {}
  30. std::vector<std::string> HdfsConfiguration::GetDefaultFilenames() {
  31. auto result = Configuration::GetDefaultFilenames();
  32. result.push_back("hdfs-site.xml");
  33. return result;
  34. }
  35. // Sets a value iff the optional<T> has a value
  36. template <class T, class U>
  37. void OptionalSet(T& target, optional<U> value) {
  38. if (value)
  39. target = *value;
  40. }
  41. std::vector<std::string> SplitOnComma(const std::string &s, bool include_empty_strings) {
  42. std::vector<std::string> res;
  43. std::string buf;
  44. for(unsigned int i=0;i<s.size();i++) {
  45. char c = s[i];
  46. if(c != ',') {
  47. buf += c;
  48. } else {
  49. if(!include_empty_strings && buf.empty()) {
  50. // Skip adding empty strings if needed
  51. continue;
  52. }
  53. res.push_back(buf);
  54. buf.clear();
  55. }
  56. }
  57. if(buf.size() > 0)
  58. res.push_back(buf);
  59. return res;
  60. }
  61. std::string RemoveSpaces(const std::string &str) {
  62. std::string res;
  63. for(unsigned int i=0; i<str.size(); i++) {
  64. char curr = str[i];
  65. if(curr != ' ') {
  66. res += curr;
  67. }
  68. }
  69. return res;
  70. }
  71. // Prepend hdfs:// to string if there isn't already a scheme
  72. // Converts unset optional into empty string
  73. std::string PrependHdfsScheme(optional<std::string> str) {
  74. if(!str)
  75. return "";
  76. if(str.value().find("://") == std::string::npos)
  77. return DEFAULT_SCHEME + str.value();
  78. return str.value();
  79. }
  80. // It's either use this, goto, or a lot of returns w/ status checks
  81. struct ha_parse_error : public std::exception {
  82. std::string desc;
  83. ha_parse_error(const std::string &val) : desc(val) {};
  84. const char *what() const noexcept override {
  85. return desc.c_str();
  86. };
  87. };
  88. std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string &nameservice) {
  89. LOG_TRACE(kRPC, << "HDFSConfiguration@" << this << "::LookupNameService( nameservice=" << nameservice<< " ) called");
  90. std::vector<NamenodeInfo> namenodes;
  91. try {
  92. // Find namenodes that belong to nameservice
  93. std::vector<std::string> namenode_ids;
  94. {
  95. std::string service_nodes = std::string("dfs.ha.namenodes.") + nameservice;
  96. optional<std::string> namenode_list = Get(service_nodes);
  97. if(namenode_list)
  98. namenode_ids = SplitOnComma(namenode_list.value(), false);
  99. else
  100. throw ha_parse_error("unable to find " + service_nodes);
  101. for(unsigned int i=0; i<namenode_ids.size(); i++) {
  102. namenode_ids[i] = RemoveSpaces(namenode_ids[i]);
  103. LOG_INFO(kRPC, << "Namenode: " << namenode_ids[i]);
  104. }
  105. }
  106. // should this error if we only find 1 NN?
  107. if(namenode_ids.empty())
  108. throw ha_parse_error("No namenodes found for nameservice " + nameservice);
  109. // Get URI for each HA namenode
  110. for(auto node_id=namenode_ids.begin(); node_id != namenode_ids.end(); node_id++) {
  111. // find URI
  112. std::string dom_node_name = std::string("dfs.namenode.rpc-address.") + nameservice + "." + *node_id;
  113. URI uri;
  114. try {
  115. uri = URI::parse_from_string(PrependHdfsScheme(Get(dom_node_name)));
  116. } catch (const uri_parse_error) {
  117. throw ha_parse_error("unable to find " + dom_node_name);
  118. }
  119. if(uri.str() == "") {
  120. LOG_WARN(kRPC, << "Attempted to read info for nameservice " << nameservice << " node " << dom_node_name << " but didn't find anything.")
  121. } else {
  122. LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString());
  123. }
  124. NamenodeInfo node(nameservice, *node_id, uri);
  125. namenodes.push_back(node);
  126. }
  127. } catch (ha_parse_error e) {
  128. LOG_ERROR(kRPC, << "HA cluster detected but failed because : " << e.what());
  129. namenodes.clear(); // Don't return inconsistent view
  130. }
  131. return namenodes;
  132. }
  133. // Interprets the resources to build an Options object
  134. Options HdfsConfiguration::GetOptions() {
  135. Options result;
  136. OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey));
  137. OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey));
  138. OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
  139. OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
  140. OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
  141. OptionalSet(result.block_size, GetInt(kDfsBlockSizeKey));
  142. OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts));
  143. OptionalSet(result.failover_connection_max_retries, GetInt(kDfsClientFailoverConnectionRetriesOnTimeouts));
  144. // Load all nameservices if it's HA configured
  145. optional<std::string> dfs_nameservices = Get("dfs.nameservices");
  146. if(dfs_nameservices) {
  147. std::string nameservice = dfs_nameservices.value();
  148. std::vector<std::string> all_services = SplitOnComma(nameservice, false);
  149. // Look up nodes for each nameservice so that FileSystem object can support
  150. // multiple nameservices by ID.
  151. for(const std::string &service : all_services) {
  152. if(service.empty())
  153. continue;
  154. LOG_DEBUG(kFileSystem, << "Parsing info for nameservice: " << service);
  155. std::vector<NamenodeInfo> nodes = LookupNameService(service);
  156. if(nodes.empty()) {
  157. LOG_WARN(kFileSystem, << "Nameservice \"" << service << "\" declared in config but nodes aren't");
  158. } else {
  159. result.services[service] = nodes;
  160. }
  161. }
  162. }
  163. optional<std::string> authentication_value = Get(kHadoopSecurityAuthenticationKey);
  164. if (authentication_value ) {
  165. std::string fixed_case_value = fixCase(authentication_value.value());
  166. if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos))
  167. result.authentication = Options::kKerberos;
  168. else
  169. result.authentication = Options::kSimple;
  170. }
  171. return result;
  172. }
  173. }