/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "common/hdfs_configuration.h" #include "common/logging.h" #include #ifndef DEFAULT_SCHEME #define DEFAULT_SCHEME "hdfs://" #endif namespace hdfs { // Constructs a configuration with no search path and no resources loaded HdfsConfiguration::HdfsConfiguration() : Configuration() {} // Constructs a configuration with a copy of the input data HdfsConfiguration::HdfsConfiguration(ConfigMap &src_map) : Configuration(src_map) {} HdfsConfiguration::HdfsConfiguration(const ConfigMap &src_map) : Configuration(src_map) {} std::vector HdfsConfiguration::GetDefaultFilenames() { auto result = Configuration::GetDefaultFilenames(); result.push_back("hdfs-site.xml"); return result; } // Sets a value iff the optional has a value template void OptionalSet(T& target, optional value) { if (value) target = *value; } std::vector SplitOnComma(const std::string &s, bool include_empty_strings) { std::vector res; std::string buf; for(unsigned int i=0;i 0) res.push_back(buf); return res; } // Prepend hdfs:// to string if there isn't already a scheme // Converts unset optional into empty string std::string PrependHdfsScheme(optional str) { if(!str) return ""; if(str.value().find("://") == std::string::npos) return DEFAULT_SCHEME + str.value(); return str.value(); } // It's either use this, goto, or a lot of returns w/ status checks struct ha_parse_error : public std::exception { std::string desc; ha_parse_error(const std::string &val) : desc(val) {}; const char *what() const noexcept override { return desc.c_str(); }; }; std::vector HdfsConfiguration::LookupNameService(const std::string &nameservice) { std::vector namenodes; try { // Find namenodes that belong to nameservice std::vector namenode_ids; { std::string service_nodes = std::string("dfs.ha.namenodes.") + nameservice; optional namenode_list = Get(service_nodes); if(namenode_list) namenode_ids = SplitOnComma(namenode_list.value(), false); else throw ha_parse_error("unable to find " + service_nodes); for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++) LOG_INFO(kRPC, << "Namenode: " << *it); } // should this error if we only find 1 NN? if(namenode_ids.empty()) throw ha_parse_error("No namenodes found for nameservice " + nameservice); // Get URI for each HA namenode for(auto node_id=namenode_ids.begin(); node_id != namenode_ids.end(); node_id++) { // find URI std::string dom_node_name = std::string("dfs.namenode.rpc-address.") + nameservice + "." + *node_id; optional node_uri = URI::parse_from_string(PrependHdfsScheme(Get(dom_node_name))); if(!node_uri) { throw ha_parse_error("unable to find " + dom_node_name); } URI uri = node_uri.value(); LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); NamenodeInfo node(nameservice, *node_id, uri); namenodes.push_back(node); } } catch (ha_parse_error e) { LOG_ERROR(kRPC, << "HA cluster detected but failed because : " << e.what()); namenodes.clear(); // Don't return inconsistent view } return namenodes; } // Interprets the resources to build an Options object Options HdfsConfiguration::GetOptions() { Options result; OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey)); OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey)); OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey)); OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts)); OptionalSet(result.failover_connection_max_retries, GetInt(kDfsClientFailoverConnectionRetriesOnTimeouts)); // Load all nameservices if it's HA configured optional dfs_nameservices = Get("dfs.nameservices"); if(dfs_nameservices) { std::string nameservice = dfs_nameservices.value(); std::vector all_services = SplitOnComma(nameservice, false); // Look up nodes for each nameservice so that FileSystem object can support // multiple nameservices by ID. for(const std::string &service : all_services) { if(service.empty()) continue; LOG_DEBUG(kFileSystem, << "Parsing info for nameservice: " << service); std::vector nodes = LookupNameService(service); if(nodes.empty()) { LOG_WARN(kFileSystem, << "Nameservice \"" << service << "\" declared in config but nodes aren't"); } else { result.services[service] = nodes; } } } optional authentication_value = Get(kHadoopSecurityAuthenticationKey); if (authentication_value ) { std::string fixed_case_value = fixCase(authentication_value.value()); if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos)) result.authentication = Options::kKerberos; else result.authentication = Options::kSimple; } return result; } }