瀏覽代碼

HDFS-10761: libhdfs++: Fix broken logic in HA retry policy. Contributed by James Clampffer

James 8 年之前
父節點
當前提交
05ddb31081

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc

@@ -56,20 +56,21 @@ RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t retrie
   (void)isIdempotentOrAtMostOnce;
   LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")");
 
-  if(s.code() == ::asio::error::timed_out && failovers < max_failover_retries_) {
+  if(failovers < max_failover_retries_ && (s.code() == ::asio::error::timed_out || s.get_server_exception_type() == Status::kStandbyException) )
+  {
     // Try connecting to another NN in case this one keeps timing out
     // Can add the backoff wait specified by dfs.client.failover.sleep.base.millis here
     return RetryAction::failover(delay_);
   }
 
-  if(retries < max_retries_) {
-    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < max_retries_");
+  if(retries < max_retries_ && failovers < max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < max_retries_ && failovers < max_failover_retries_");
     return RetryAction::retry(delay_);
   } else if (retries >= max_retries_ && failovers < max_failover_retries_) {
     LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= max_retries_ && failovers < max_failover_retries_");
     return RetryAction::failover(delay_);
-  } else if (retries >= max_retries_ && failovers == max_failover_retries_) {
-    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= max_retries_ && failovers == max_failover_retries_");
+  } else if (retries <= max_retries_ && failovers == max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries <= max_retries_ && failovers == max_failover_retries_");
     // 1 last retry on new connection
     return RetryAction::retry(delay_);
   }

+ 21 - 27
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc

@@ -291,39 +291,33 @@ void RpcEngine::RpcCommsError(
 
   optional<RetryAction> head_action = optional<RetryAction>();
 
-  //We are talking to the Standby NN, let's talk to the active one instead.
-  if(ha_persisted_info_ && status.get_server_exception_type() == Status::kStandbyException) {
-    LOG_INFO(kRPC, << "Received StandbyException.  Failing over.");
-    head_action = RetryAction::failover(std::max(0,options_.rpc_retry_delay_ms));
-  } else {
-    // Filter out anything with too many retries already
-    for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
-      auto req = *it;
+  // Filter out anything with too many retries already
+  for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
+    auto req = *it;
 
-      LOG_DEBUG(kRPC, << req->GetDebugString());
+    LOG_DEBUG(kRPC, << req->GetDebugString());
 
-      RetryAction retry = RetryAction::fail(""); // Default to fail
+    RetryAction retry = RetryAction::fail(""); // Default to fail
 
-      if (retry_policy()) {
-        retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), req->get_failover_count(), true);
-      }
+    if (retry_policy()) {
+      retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), req->get_failover_count(), true);
+    }
 
-      if (retry.action == RetryAction::FAIL) {
-        // If we've exceeded the maximum retry, take the latest error and pass it
-        //    on.  There might be a good argument for caching the first error
-        //    rather than the last one, that gets messy
+    if (retry.action == RetryAction::FAIL) {
+      // If we've exceeded the maximum retry, take the latest error and pass it
+      //    on.  There might be a good argument for caching the first error
+      //    rather than the last one, that gets messy
 
-        io_service().post([req, status]() {
-          req->OnResponseArrived(nullptr, status);  // Never call back while holding a lock
-        });
-        it = pendingRequests.erase(it);
-      } else {
-        if (!head_action) {
-          head_action = retry;
-        }
-
-        ++it;
+      io_service().post([req, status]() {
+        req->OnResponseArrived(nullptr, status);  // Never call back while holding a lock
+      });
+      it = pendingRequests.erase(it);
+    } else {
+      if (!head_action) {
+        head_action = retry;
       }
+
+      ++it;
     }
   }