Parcourir la source

YARN-8362. Bugfix logic in container retries in node manager.
Contributed by Chandni Singh

Eric Yang il y a 7 ans
Parent
commit
135941e00d

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -1602,8 +1602,10 @@ public class ContainerImpl implements Container {
         }
         }
         container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
         container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
       }
       }
-
       if (container.shouldRetry(container.exitCode)) {
       if (container.shouldRetry(container.exitCode)) {
+        // Updates to the retry context should  be protected from concurrent
+        // writes. It should only be called from this transition.
+        container.retryPolicy.updateRetryContext(container.windowRetryContext);
         container.storeRetryContext();
         container.storeRetryContext();
         doRelaunch(container,
         doRelaunch(container,
             container.windowRetryContext.getRemainingRetries(),
             container.windowRetryContext.getRemainingRetries(),

+ 35 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java

@@ -42,49 +42,40 @@ public class SlidingWindowRetryPolicy {
 
 
   public boolean shouldRetry(RetryContext retryContext,
   public boolean shouldRetry(RetryContext retryContext,
       int errorCode) {
       int errorCode) {
-    ContainerRetryContext containerRC = retryContext
-        .containerRetryContext;
+    ContainerRetryContext containerRC = retryContext.containerRetryContext;
     Preconditions.checkNotNull(containerRC, "container retry context null");
     Preconditions.checkNotNull(containerRC, "container retry context null");
     ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy();
     ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy();
     if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
     if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
         || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
         || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
         && containerRC.getErrorCodes() != null
         && containerRC.getErrorCodes() != null
         && containerRC.getErrorCodes().contains(errorCode))) {
         && containerRC.getErrorCodes().contains(errorCode))) {
-      if (containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER) {
-        return true;
-      }
-      int pendingRetries = calculatePendingRetries(retryContext);
-      updateRetryContext(retryContext, pendingRetries);
-      return pendingRetries > 0;
+      return containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER
+          || calculateRemainingRetries(retryContext) > 0;
     }
     }
     return false;
     return false;
   }
   }
 
 
   /**
   /**
-   * Calculates the pending number of retries.
-   * <p>
-   * When failuresValidityInterval is > 0, it also removes time entries from
-   * <code>restartTimes</code> which are outside the validity interval.
+   * Calculates the remaining number of retries.
    *
    *
-   * @return the pending retries.
+   * @return the remaining retries.
    */
    */
-  private int calculatePendingRetries(RetryContext retryContext) {
+  private int calculateRemainingRetries(RetryContext retryContext) {
     ContainerRetryContext containerRC =
     ContainerRetryContext containerRC =
         retryContext.containerRetryContext;
         retryContext.containerRetryContext;
     if (containerRC.getFailuresValidityInterval() > 0) {
     if (containerRC.getFailuresValidityInterval() > 0) {
-      Iterator<Long> iterator = retryContext.getRestartTimes().iterator();
+      int validFailuresCount = 0;
       long currentTime = clock.getTime();
       long currentTime = clock.getTime();
-      while (iterator.hasNext()) {
-        long restartTime = iterator.next();
+      for (int i = retryContext.restartTimes.size() - 1; i >= 0; i--) {
+        long restartTime = retryContext.restartTimes.get(i);
         if (currentTime - restartTime
         if (currentTime - restartTime
-            > containerRC.getFailuresValidityInterval()) {
-          iterator.remove();
+            <= containerRC.getFailuresValidityInterval()) {
+          validFailuresCount++;
         } else {
         } else {
           break;
           break;
         }
         }
       }
       }
-      return containerRC.getMaxRetries() -
-          retryContext.getRestartTimes().size();
+      return containerRC.getMaxRetries() - validFailuresCount;
     } else {
     } else {
       return retryContext.getRemainingRetries();
       return retryContext.getRemainingRetries();
     }
     }
@@ -93,13 +84,30 @@ public class SlidingWindowRetryPolicy {
   /**
   /**
    * Updates remaining retries and the restart time when
    * Updates remaining retries and the restart time when
    * required in the retryContext.
    * required in the retryContext.
+   * <p>
+   * When failuresValidityInterval is > 0, it also removes time entries from
+   * <code>restartTimes</code> which are outside the validity interval.
    */
    */
-  private void updateRetryContext(RetryContext retryContext,
-      int pendingRetries) {
-    retryContext.setRemainingRetries(pendingRetries - 1);
-    if (retryContext.containerRetryContext.getFailuresValidityInterval()
-        > 0) {
-      retryContext.getRestartTimes().add(clock.getTime());
+  protected void updateRetryContext(RetryContext retryContext) {
+    if (retryContext.containerRetryContext.getFailuresValidityInterval() > 0) {
+      ContainerRetryContext containerRC = retryContext.containerRetryContext;
+      Iterator<Long> iterator = retryContext.getRestartTimes().iterator();
+      long currentTime = clock.getTime();
+
+      while (iterator.hasNext()) {
+        long restartTime = iterator.next();
+        if (currentTime - restartTime
+            > containerRC.getFailuresValidityInterval()) {
+          iterator.remove();
+        } else {
+          break;
+        }
+      }
+      retryContext.setRemainingRetries(containerRC.getMaxRetries() -
+          retryContext.restartTimes.size());
+      retryContext.getRestartTimes().add(currentTime);
+    } else {
+      retryContext.remainingRetries--;
     }
     }
   }
   }
 
 

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java

@@ -64,12 +64,18 @@ public class TestSlidingWindowRetryPolicy {
         new SlidingWindowRetryPolicy.RetryContext(retryContext);
         new SlidingWindowRetryPolicy.RetryContext(retryContext);
     Assert.assertTrue("retry 1",
     Assert.assertTrue("retry 1",
         retryPolicy.shouldRetry(windowRetryContext, 12));
         retryPolicy.shouldRetry(windowRetryContext, 12));
+    retryPolicy.updateRetryContext(windowRetryContext);
+
     clock.setTime(20);
     clock.setTime(20);
     Assert.assertTrue("retry 2",
     Assert.assertTrue("retry 2",
         retryPolicy.shouldRetry(windowRetryContext, 12));
         retryPolicy.shouldRetry(windowRetryContext, 12));
+    retryPolicy.updateRetryContext(windowRetryContext);
+
     clock.setTime(40);
     clock.setTime(40);
     Assert.assertTrue("retry 3",
     Assert.assertTrue("retry 3",
         retryPolicy.shouldRetry(windowRetryContext, 12));
         retryPolicy.shouldRetry(windowRetryContext, 12));
+    retryPolicy.updateRetryContext(windowRetryContext);
+
     clock.setTime(45);
     clock.setTime(45);
     Assert.assertFalse("retry failed",
     Assert.assertFalse("retry failed",
         retryPolicy.shouldRetry(windowRetryContext, 12));
         retryPolicy.shouldRetry(windowRetryContext, 12));