ソースを参照

HDFS-11294: libhdfs++: Segfault in HA failover if DNS lookup for both Namenodes fails. Contributed by James Clampffer.

James 7 年 前
コミット
bea3e99ede

+ 10 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h

@@ -37,17 +37,23 @@ static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect";
 static constexpr const char * FS_NN_READ_EVENT = "NN::read";
 static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
 
+static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
+static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
+static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
+
+
 // NN failover event due to issues with the current NN; might be standby, might be dead.
 // Invokes the fs_event_callback using the nameservice name in the cluster string.
 // The uint64_t value argument holds an address that can be reinterpreted as a const char *
 // and provides the full URI of the node the failover will attempt to connect to next.
 static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover";
 
-static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
-static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
-static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
-
+// Invoked when RpcConnection tries to use an empty set of endpoints to figure out
+// which NN in a HA cluster to connect to.
+static constexpr const char * FS_NN_EMPTY_ENDPOINTS_EVENT = "NN::bad_failover::no_endpoints";
 
+// Invoked prior to determining if failed NN rpc calls should be retried or discarded.
+static constexpr const char * FS_NN_PRE_RPC_RETRY_EVENT = "NN::rpc::get_retry_action";
 
 class event_response {
 public:

+ 86 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc

@@ -41,6 +41,12 @@ using namespace std::placeholders;
 
 static constexpr tPort kDefaultPort = 8020;
 
+/** Annotate what parts of the code below are implementatons of API functions
+ *  and if they are normal vs. extended API.
+ */
+#define LIBHDFS_C_API
+#define LIBHDFSPP_EXT_API
+
 /* Separate the handles used by the C api from the C++ API*/
 struct hdfs_internal {
   hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {}
@@ -79,6 +85,7 @@ struct hdfsFile_internal {
 thread_local std::string errstr;
 
 /* Fetch last error that happened in this thread */
+LIBHDFSPP_EXT_API
 int hdfsGetLastError(char *buf, int len) {
   //No error message
   if(errstr.empty()){
@@ -255,6 +262,7 @@ optional<std::string> getAbsolutePath(hdfsFS fs, const char* path) {
  * C API implementations
  **/
 
+LIBHDFS_C_API
 int hdfsFileIsOpenForRead(hdfsFile file) {
   /* files can only be open for reads at the moment, do a quick check */
   if (!CheckHandle(file)){
@@ -263,6 +271,7 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
   return 1; // Update implementation when we get file writing
 }
 
+LIBHDFS_C_API
 int hdfsFileIsOpenForWrite(hdfsFile file) {
   /* files can only be open for reads at the moment, so return false */
   CheckHandle(file);
@@ -332,6 +341,7 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st
   }
 }
 
+LIBHDFSPP_EXT_API
 hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
   // Same idea as the first half of doHdfsConnect, but return the wrapped FS before
   // connecting.
@@ -367,6 +377,7 @@ hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
   return nullptr;
 }
 
+LIBHDFSPP_EXT_API
 int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
   if(!CheckSystem(fs)) {
     return ENODEV;
@@ -420,24 +431,29 @@ int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
   return 0;
 }
 
+LIBHDFS_C_API
 hdfsFS hdfsConnect(const char *nn, tPort port) {
   return hdfsConnectAsUser(nn, port, "");
 }
 
+LIBHDFS_C_API
 hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
   return doHdfsConnect(std::string(nn), port, std::string(user), Options());
 }
 
+LIBHDFS_C_API
 hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
   //libhdfspp always returns a new instance
   return doHdfsConnect(std::string(nn), port, std::string(user), Options());
 }
 
+LIBHDFS_C_API
 hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
   //libhdfspp always returns a new instance
   return hdfsConnectAsUser(nn, port, "");
 }
 
+LIBHDFSPP_EXT_API
 int hdfsCancelPendingConnection(hdfsFS fs) {
   // todo: stick an enum in hdfs_internal to check the connect state
   if(!CheckSystem(fs)) {
@@ -458,6 +474,7 @@ int hdfsCancelPendingConnection(hdfsFS fs) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsDisconnect(hdfsFS fs) {
   try
   {
@@ -476,6 +493,7 @@ int hdfsDisconnect(hdfsFS fs) {
   }
 }
 
+LIBHDFS_C_API
 hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
                       short replication, tSize blocksize) {
   try
@@ -512,6 +530,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
   }
 }
 
+LIBHDFS_C_API
 int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
   try
   {
@@ -528,6 +547,7 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
   }
 }
 
+LIBHDFS_C_API
 char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
   try
   {
@@ -556,6 +576,7 @@ char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
   try
   {
@@ -582,6 +603,7 @@ int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsAvailable(hdfsFS fs, hdfsFile file) {
   //Since we do not have read ahead implemented, return 0 if fs and file are good;
   errno = 0;
@@ -591,6 +613,7 @@ int hdfsAvailable(hdfsFS fs, hdfsFile file) {
   return 0;
 }
 
+LIBHDFS_C_API
 tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
   try {
     errno = 0;
@@ -604,6 +627,7 @@ tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
   }
 }
 
+LIBHDFS_C_API
 tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
   try {
     errno = 0;
@@ -633,6 +657,7 @@ tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
     try {
       errno = 0;
@@ -659,6 +684,7 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
     }
 }
 
+LIBHDFS_C_API
 int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
     try {
       errno = 0;
@@ -682,6 +708,7 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
     }
 }
 
+LIBHDFS_C_API
 tOffset hdfsGetCapacity(hdfsFS fs) {
   try {
     errno = 0;
@@ -705,6 +732,7 @@ tOffset hdfsGetCapacity(hdfsFS fs) {
   }
 }
 
+LIBHDFS_C_API
 tOffset hdfsGetUsed(hdfsFS fs) {
   try {
     errno = 0;
@@ -777,6 +805,7 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
   file_info->mLastAccess = stat_info.access_time;
 }
 
+LIBHDFS_C_API
 int hdfsExists(hdfsFS fs, const char *path) {
   try {
     errno = 0;
@@ -800,6 +829,7 @@ int hdfsExists(hdfsFS fs, const char *path) {
   }
 }
 
+LIBHDFS_C_API
 hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
   try {
     errno = 0;
@@ -828,6 +858,7 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
   }
 }
 
+LIBHDFS_C_API
 hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
   try {
       errno = 0;
@@ -868,6 +899,7 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
     }
 }
 
+LIBHDFS_C_API
 void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
 {
     errno = 0;
@@ -880,6 +912,7 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
     delete[] hdfsFileInfo;
 }
 
+LIBHDFS_C_API
 int hdfsCreateDirectory(hdfsFS fs, const char* path) {
   try {
     errno = 0;
@@ -904,6 +937,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
   try {
       errno = 0;
@@ -927,6 +961,7 @@ int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
     }
 }
 
+LIBHDFS_C_API
 int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
   try {
     errno = 0;
@@ -951,6 +986,7 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsChmod(hdfsFS fs, const char* path, short mode){
   try {
       errno = 0;
@@ -977,6 +1013,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){
     }
 }
 
+LIBHDFS_C_API
 int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){
   try {
       errno = 0;
@@ -1003,6 +1040,7 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
     }
 }
 
+LIBHDFSPP_EXT_API
 hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){
   try {
       errno = 0;
@@ -1041,6 +1079,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t
     }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
   try {
     errno = 0;
@@ -1068,6 +1107,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
   }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
   try {
     errno = 0;
@@ -1125,6 +1165,7 @@ int hdfsRenameSnapshot(hdfsFS fs, const char* path, const char* old_name, const
 
 }
 
+LIBHDFSPP_EXT_API
 int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
   try {
     errno = 0;
@@ -1148,6 +1189,7 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
   }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
   try {
     errno = 0;
@@ -1171,6 +1213,7 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
   }
 }
 
+LIBHDFS_C_API
 tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
                 tSize length) {
   try
@@ -1193,6 +1236,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
   }
 }
 
+LIBHDFS_C_API
 tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
   try
   {
@@ -1215,12 +1259,14 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
   }
 }
 
+LIBHDFS_C_API
 int hdfsUnbufferFile(hdfsFile file) {
   //Currently we are not doing any buffering
   CheckHandle(file);
   return -1;
 }
 
+LIBHDFS_C_API
 int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
   try
     {
@@ -1239,6 +1285,7 @@ int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats)
     }
 }
 
+LIBHDFS_C_API
 int hdfsFileClearReadStatistics(hdfsFile file) {
   try
     {
@@ -1255,16 +1302,19 @@ int hdfsFileClearReadStatistics(hdfsFile file) {
     }
 }
 
+LIBHDFS_C_API
 int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
     return stats->totalBytesRead - stats->totalLocalBytesRead;
 }
 
+LIBHDFS_C_API
 void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
     errno = 0;
     delete stats;
 }
 
 /* 0 on success, -1 on error*/
+LIBHDFS_C_API
 int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
   try
   {
@@ -1287,6 +1337,7 @@ int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
   }
 }
 
+LIBHDFS_C_API
 tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
   try
   {
@@ -1326,7 +1377,7 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) {
   }
 }
 
-
+LIBHDFSPP_EXT_API
 int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
 {
   try
@@ -1402,6 +1453,7 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations
   }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
   errno = 0;
   if (blockLocations == nullptr)
@@ -1422,6 +1474,7 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
   return 0;
 }
 
+LIBHDFS_C_API
 char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
   try
     {
@@ -1462,6 +1515,7 @@ char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
     }
 }
 
+LIBHDFS_C_API
 void hdfsFreeHosts(char ***blockHosts) {
   errno = 0;
   if (blockHosts == nullptr)
@@ -1526,6 +1580,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
   return event_response::make_ok();
 }
 
+LIBHDFSPP_EXT_API
 int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
 {
   fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
@@ -1533,7 +1588,7 @@ int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
   return 0;
 }
 
-
+LIBHDFSPP_EXT_API
 int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
 {
   file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
@@ -1572,6 +1627,7 @@ hdfsBuilder::hdfsBuilder(const char * directory) :
   config = LoadDefault(loader);
 }
 
+LIBHDFS_C_API
 struct hdfsBuilder *hdfsNewBuilder(void)
 {
   try
@@ -1587,18 +1643,21 @@ struct hdfsBuilder *hdfsNewBuilder(void)
   }
 }
 
+LIBHDFS_C_API
 void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
 {
   errno = 0;
   bld->overrideHost = std::string(nn);
 }
 
+LIBHDFS_C_API
 void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
 {
   errno = 0;
   bld->overridePort = port;
 }
 
+LIBHDFS_C_API
 void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
 {
   errno = 0;
@@ -1607,12 +1666,14 @@ void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
   }
 }
 
+LIBHDFS_C_API
 void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
   //libhdfspp always returns a new instance, so nothing to do
   (void)bld;
   errno = 0;
 }
 
+LIBHDFS_C_API
 void hdfsFreeBuilder(struct hdfsBuilder *bld)
 {
   try
@@ -1626,6 +1687,7 @@ void hdfsFreeBuilder(struct hdfsBuilder *bld)
   }
 }
 
+LIBHDFS_C_API
 int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
                           const char *val)
 {
@@ -1650,16 +1712,22 @@ int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
   }
 }
 
+LIBHDFS_C_API
 void hdfsConfStrFree(char *val)
 {
   errno = 0;
   free(val);
 }
 
+LIBHDFS_C_API
 hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
-  return doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
+  hdfsFS fs = doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
+  // Always free the builder
+  hdfsFreeBuilder(bld);
+  return fs;
 }
 
+LIBHDFS_C_API
 int hdfsConfGetStr(const char *key, char **val)
 {
   try
@@ -1674,6 +1742,7 @@ int hdfsConfGetStr(const char *key, char **val)
   }
 }
 
+LIBHDFS_C_API
 int hdfsConfGetInt(const char *key, int32_t *val)
 {
   try
@@ -1706,6 +1775,7 @@ struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
   }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
                           char **val)
 {
@@ -1739,6 +1809,7 @@ bool isValidInt(int64_t value)
           value <= std::numeric_limits<int>::max());
 }
 
+LIBHDFSPP_EXT_API
 int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
 {
   try
@@ -1765,6 +1836,7 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val
   }
 }
 
+LIBHDFSPP_EXT_API
 int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val)
 {
   try
@@ -1859,15 +1931,17 @@ void CForwardingLogger::FreeLogData(LogData *data) {
   free(data);
 }
 
-
+LIBHDFSPP_EXT_API
 LogData *hdfsCopyLogData(LogData *data) {
   return CForwardingLogger::CopyLogData(data);
 }
 
+LIBHDFSPP_EXT_API
 void hdfsFreeLogData(LogData *data) {
   CForwardingLogger::FreeLogData(data);
 }
 
+LIBHDFSPP_EXT_API
 void hdfsSetLogFunction(void (*callback)(LogData*)) {
   CForwardingLogger *logger = new CForwardingLogger();
   logger->SetCallback(callback);
@@ -1900,6 +1974,7 @@ static bool IsComponentValid(int component) {
   return true;
 }
 
+LIBHDFSPP_EXT_API
 int hdfsEnableLoggingForComponent(int component) {
   errno = 0;
   if(!IsComponentValid(component))
@@ -1908,6 +1983,7 @@ int hdfsEnableLoggingForComponent(int component) {
   return 0;
 }
 
+LIBHDFSPP_EXT_API
 int hdfsDisableLoggingForComponent(int component) {
   errno = 0;
   if(!IsComponentValid(component))
@@ -1916,6 +1992,7 @@ int hdfsDisableLoggingForComponent(int component) {
   return 0;
 }
 
+LIBHDFSPP_EXT_API
 int hdfsSetLoggingLevel(int level) {
   errno = 0;
   if(!IsLevelValid(level))
@@ -1923,3 +2000,8 @@ int hdfsSetLoggingLevel(int level) {
   LogManager::SetLogLevel(static_cast<LogLevel>(level));
   return 0;
 }
+
+#undef LIBHDFS_C_API
+#undef LIBHDFSPP_EXT_API
+
+

+ 37 - 29
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc

@@ -52,8 +52,10 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se
 
     active_info_ = servers[0];
     standby_info_ = servers[1];
-    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
-    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
+    LOG_INFO(kRPC, << "HA enabled.  Using the following namenodes from the configuration."
+                   << "\nNote: Active namenode cannot be determined until a connection has been made.")
+    LOG_INFO(kRPC, << "First namenode url  = " << active_info_.uri.str());
+    LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str());
 
     enabled_ = true;
     if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
@@ -64,51 +66,57 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se
 
 HANamenodeTracker::~HANamenodeTracker() {}
 
-//  Pass in endpoint from current connection, this will do a reverse lookup
-//  and return the info for the standby node. It will also swap its state internally.
-ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
-  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
+bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints,
+                                             ResolvedNamenodeInfo& out)
+{
   mutex_guard swap_lock(swap_lock_);
 
-  ResolvedNamenodeInfo failover_node;
+  // Cannot look up without a key.
+  if(current_endpoints.size() == 0) {
+    event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(),
+                          0 /*Not much to say about context without endpoints*/);
+    LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint.");
+    return false;
+  }
+
+  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]);
 
-  // Connected to standby, switch standby to active
-  if(IsCurrentActive_locked(current_endpoint)) {
+  if(IsCurrentActive_locked(current_endpoints[0])) {
     std::swap(active_info_, standby_info_);
     if(event_handlers_)
       event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
                             reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
-  } else if(IsCurrentStandby_locked(current_endpoint)) {
+    out = active_info_;
+  } else if(IsCurrentStandby_locked(current_endpoints[0])) {
     // Connected to standby
     if(event_handlers_)
       event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
                             reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
-    failover_node = active_info_;
+    out = active_info_;
   } else {
-    // Invalid state, throw for testing
-    std::string ep1 = format_endpoints(active_info_.endpoints);
-    std::string ep2 = format_endpoints(standby_info_.endpoints);
-
-    std::stringstream msg;
-    msg << "Looked for " << current_endpoint << " in\n";
-    msg << ep1 << " and\n";
-    msg << ep2 << std::endl;
-
-    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
-    throw std::runtime_error(msg.str());
+    // Invalid state (or a NIC was added that didn't show up during DNS)
+    std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream
+    errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n"
+             << format_endpoints(active_info_.endpoints) << " and\n"
+             << format_endpoints(standby_info_.endpoints) << std::endl;
+    LOG_ERROR(kRPC, << errorMsg.str());
+    return false;
   }
 
-  if(failover_node.endpoints.empty()) {
-    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
-    if(!ResolveInPlace(ioservice_, failover_node)) {
-      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
-                      << "failed.  Please make sure your configuration is up to date.");
+  // Extra DNS on swapped node to try and get EPs if it didn't already have them
+  if(out.endpoints.empty()) {
+    LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again");
+    if(!ResolveInPlace(ioservice_, out)) {
+      // Stuck retrying against the same NN that was able to be resolved in this case
+      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str()
+                      << " failed.  Please make sure your configuration is up to date.");
     }
   }
-  return failover_node;
+
+  return true;
 }
 
+
 bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
   for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
     if(ep.address() == active_info_.endpoints[i].address()) {

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h

@@ -48,15 +48,18 @@ class HANamenodeTracker {
   bool is_enabled() const { return enabled_; }
   bool is_resolved() const { return resolved_; }
 
-  // Get node opposite of the current one if possible (swaps active/standby)
+  // Pass in vector of endpoints held by RpcConnection, use endpoints to infer node
+  // currently being used.  Swap internal state and set out to other node.
   // Note: This will always mutate internal state.  Use IsCurrentActive/Standby to
   // get info without changing state
-  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
+  bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints,
+                            ResolvedNamenodeInfo& out);
 
+ private:
+  // See if endpoint ep is part of the list of endpoints for the active or standby NN
   bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
   bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
 
- private:
   // If HA should be enabled, according to our options and runtime info like # nodes provided
   bool enabled_;
   // If we were able to resolve at least 1 HA namenode

+ 28 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -213,6 +213,11 @@ void RpcEngine::RpcCommsError(
   optional<RetryAction> head_action = optional<RetryAction>();
 
   // Filter out anything with too many retries already
+  if(event_handlers_) {
+    event_handlers_->call(FS_NN_PRE_RPC_RETRY_EVENT, "RpcCommsError",
+                          reinterpret_cast<int64_t>(this));
+  }
+
   for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
     auto req = *it;
 
@@ -261,15 +266,34 @@ void RpcEngine::RpcCommsError(
       // If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active)
       if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) {
 
-        for(unsigned int i=0; i<pendingRequests.size();i++)
+        for(unsigned int i=0; i<pendingRequests.size();i++) {
           pendingRequests[i]->IncrementFailoverCount();
+        }
 
-        ResolvedNamenodeInfo new_active_nn_info =
-            ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_[0]/*reverse lookup*/);
+        ResolvedNamenodeInfo new_active_nn_info;
+        bool failoverInfoFound = ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_, new_active_nn_info);
+        if(!failoverInfoFound) {
+          // This shouldn't be a common case, the set of endpoints was empty, likely due to DNS issues.
+          // Another possibility is a network device has been added or removed due to a VM starting or stopping.
+
+          LOG_ERROR(kRPC, << "Failed to find endpoints for the alternate namenode."
+                          << "Make sure Namenode hostnames can be found with a DNS lookup.");
+          // Kill all pending RPC requests since there's nowhere for this to go
+          Status badEndpointStatus = Status::Error("No endpoints found for namenode");
+
+          for(unsigned int i=0; i<pendingRequests.size(); i++) {
+            std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i];
+            io_service().post([sharedCurrentRequest, badEndpointStatus]() {
+              sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus);  // Never call back while holding a lock
+            });
+          }
 
-        LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str());
+          // Clear request vector. This isn't a recoverable error.
+          pendingRequests.clear();
+        }
 
         if(ha_persisted_info_->is_resolved()) {
+          LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str());
           last_endpoints_ = new_active_nn_info.endpoints;
         } else {
           LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info="

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -120,6 +120,12 @@ add_executable(user_lock_test user_lock_test.cc)
 target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(user_lock user_lock_test)
 
+add_executable(hdfs_config_connect_bugs_test hdfs_config_connect_bugs.cc)
+target_link_libraries(hdfs_config_connect_bugs_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+add_memcheck_test(hdfs_config_connect_bugs hdfs_config_connect_bugs_test)
+
+
+
 #
 #
 #   INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS
@@ -136,7 +142,7 @@ include_directories (
 
 add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
 
-# TODO: get all of the mini dfs library bits here in one plase
+# TODO: get all of the mini dfs library bits here in one place
 # add_library(hdfspp_mini_cluster     native_mini_dfs ${JAVA_JVM_LIBRARY} )
 
 #TODO: Link against full library rather than just parts
@@ -157,4 +163,8 @@ build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hd
 link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY}  ${SASL_LIBRARIES})
 add_libhdfs_test  (hdfs_ext hdfspp_test_shim_static)
 
+#build_libhdfs_test(hdfs_config_connect_bugs hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_config_connect_bugs.cc)
+#link_libhdfs_test (hdfs_config_connect_bugs hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY}  ${SASL_LIBRARIES})
+#add_libhdfs_test  (hdfs_config_connect_bugs hdfspp_test_shim_static)
+
 endif(HADOOP_BUILD)

+ 136 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/hdfs_ext.h"
+
+#include "configuration_test.h"
+
+#include <google/protobuf/stubs/common.h>
+
+#include <cstring>
+#include <chrono>
+#include <exception>
+
+
+static const char *hdfs_11294_core_site_txt =
+"<configuration>\n"
+"  <property name=\"fs.defaultFS\" value=\"hdfs://NAMESERVICE1\"/>\n"
+"  <property name=\"hadoop.security.authentication\" value=\"simple\"/>\n"
+"  <property name=\"ipc.client.connect.retry.interval\" value=\"1\">\n"
+"</configuration>\n";
+
+static const char *hdfs_11294_hdfs_site_txt =
+"<configuration>\n"
+"  <property>\n"
+"    <name>dfs.nameservices</name>\n"
+"    <value>NAMESERVICE1</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.ha.namenodes.NAMESERVICE1</name>\n"
+"    <value>nn1, nn2</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.rpc-address.NAMESERVICE1.nn1</name>\n"
+"    <value>nonesuch1.apache.org:8020</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn1</name>\n"
+"    <value>nonesuch1.apache.org:8040</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.http-address.NAMESERVICE1.nn1</name>\n"
+"    <value>nonesuch1.apache.org:50070</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.rpc-address.NAMESERVICE1.nn2</name>\n"
+"    <value>nonesuch2.apache.org:8020</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn2</name>\n"
+"    <value>nonesuch2.apache.org:8040</value>\n"
+"  </property>\n"
+"  <property>\n"
+"    <name>dfs.namenode.http-address.NAMESERVICE1.nn2</name>\n"
+"    <value>nonesuch2.apache.org:50070</value>\n"
+"  </property>\n"
+"</configuration>\n";
+
+
+
+
+namespace hdfs {
+
+// Make sure we can set up a mini-cluster and connect to it
+TEST(ConfigConnectBugs, Test_HDFS_11294) {
+  // Directory for hdfs config
+  TempDir td;
+
+  const std::string& tempDirPath = td.path;
+  const std::string coreSitePath = tempDirPath + "/core-site.xml";
+  const std::string hdfsSitePath = tempDirPath + "/hdfs-site.xml";
+
+  // Write configs
+  FILE *coreSite = fopen(coreSitePath.c_str(), "w");
+  EXPECT_NE(coreSite, nullptr);
+  int coreSiteLength = strlen(hdfs_11294_core_site_txt);
+  size_t res = fwrite(hdfs_11294_core_site_txt, 1, coreSiteLength, coreSite);
+  EXPECT_EQ(res, coreSiteLength);
+  EXPECT_EQ(fclose(coreSite), 0);
+
+  FILE *hdfsSite = fopen(hdfsSitePath.c_str(), "w");
+  EXPECT_NE(hdfsSite, nullptr);
+  int hdfsSiteLength = strlen(hdfs_11294_hdfs_site_txt);
+  res = fwrite(hdfs_11294_hdfs_site_txt, 1, hdfsSiteLength, hdfsSite);
+  EXPECT_EQ(res, hdfsSiteLength);
+  EXPECT_EQ(fclose(hdfsSite), 0);
+
+  // Load configs with new FS
+  hdfsBuilder *bld = hdfsNewBuilderFromDirectory(tempDirPath.c_str());
+  hdfsBuilderSetNameNode(bld, "NAMESERVICE1");
+
+  // In HDFS-11294 connecting would crash because DNS couldn't resolve
+  // endpoints but the RpcEngine would attempt to dereference a non existant
+  // element in a std::vector and crash.  Test passes if connect doesn't crash.
+  hdfsFS fileSystem = hdfsBuilderConnect(bld);
+
+  // FS shouldn't be created if it can't connect.
+  EXPECT_EQ(fileSystem, nullptr);
+
+  // Verify it got to endpoint check
+  char errMsgBuf[100];
+  memset(errMsgBuf, 0, 100);
+  EXPECT_EQ( hdfsGetLastError(errMsgBuf, 100), 0);
+  EXPECT_STREQ(errMsgBuf, "Exception:No endpoints found for namenode");
+
+
+  // remove config files
+  EXPECT_EQ(remove(coreSitePath.c_str()), 0);
+  EXPECT_EQ(remove(hdfsSitePath.c_str()), 0);
+}
+
+} // end namespace hdfs
+
+int main(int argc, char *argv[]) {
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  int exit_code = RUN_ALL_TESTS();
+  google::protobuf::ShutdownProtobufLibrary();
+
+  return exit_code;
+}

+ 11 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc

@@ -398,14 +398,20 @@ TEST(RpcEngineTest, TestEventCallbacks)
     io_service.stop();
     ASSERT_TRUE(stat.ok());
   });
+
+  // If you're adding event hooks you'll most likely need to update this.
+  // It's a brittle test but makes it hard to miss control flow changes in RPC retry.
+  for(const auto& m : callbacks)
+    std::cerr << m << std::endl;
   io_service.run();
   ASSERT_TRUE(complete);
-  ASSERT_EQ(8, callbacks.size());
+  ASSERT_EQ(9, callbacks.size());
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
-  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
-  ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
-  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
-  for (int i=4; i < 7; i++)
+  ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[1]); // figure out retry decision
+  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[2]); // reconnect
+  ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[3]); // makes an error
+  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[4]); // reconnect
+  for (int i=5; i < 8; i++)
     ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
 }