Browse Source

YARN-3136. Fixed a synchronization problem of AbstractYarnScheduler#getTransferredContainers. Contributed by Sunil G
(cherry picked from commit 497c86b485b1bb8a2eba52308646d8e1ee76bce3)

Jian He 10 years ago
parent
commit
6f70f1b1af

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -179,6 +179,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3493. RM fails to come up with error "Failed to load/recover state" 
     when mem settings are changed. (Jian He via wangda)
 
+    YARN-3136. Fixed a synchronization problem of
+    AbstractYarnScheduler#getTransferredContainers. (Sunil G via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -469,6 +469,14 @@
     <Method name="recoverContainersOnNode" />
     <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler" />
+    <Or>
+      <Field name="rmContext" />
+      <Field name="applications" />
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
   
   <!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->
   <Match>

+ 25 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -298,32 +298,35 @@ public class ApplicationMasterService extends AbstractService implements
 
       // For work-preserving AM restart, retrieve previous attempts' containers
       // and corresponding NM tokens.
-      List<Container> transferredContainers =
-          ((AbstractYarnScheduler) rScheduler)
+      if (app.getApplicationSubmissionContext()
+          .getKeepContainersAcrossApplicationAttempts()) {
+        List<Container> transferredContainers = ((AbstractYarnScheduler) rScheduler)
             .getTransferredContainers(applicationAttemptId);
-      if (!transferredContainers.isEmpty()) {
-        response.setContainersFromPreviousAttempts(transferredContainers);
-        List<NMToken> nmTokens = new ArrayList<NMToken>();
-        for (Container container : transferredContainers) {
-          try {
-            NMToken token = rmContext.getNMTokenSecretManager()
-                .createAndGetNMToken(app.getUser(), applicationAttemptId,
-                    container);
-            if (null != token) {
-              nmTokens.add(token);
-            }
-          } catch (IllegalArgumentException e) {
-            // if it's a DNS issue, throw UnknowHostException directly and that
-            // will be automatically retried by RMProxy in RPC layer.
-            if (e.getCause() instanceof UnknownHostException) {
-              throw (UnknownHostException) e.getCause();
+        if (!transferredContainers.isEmpty()) {
+          response.setContainersFromPreviousAttempts(transferredContainers);
+          List<NMToken> nmTokens = new ArrayList<NMToken>();
+          for (Container container : transferredContainers) {
+            try {
+              NMToken token = rmContext.getNMTokenSecretManager()
+                  .createAndGetNMToken(app.getUser(), applicationAttemptId,
+                      container);
+              if (null != token) {
+                nmTokens.add(token);
+              }
+            } catch (IllegalArgumentException e) {
+              // if it's a DNS issue, throw UnknowHostException directly and
+              // that
+              // will be automatically retried by RMProxy in RPC layer.
+              if (e.getCause() instanceof UnknownHostException) {
+                throw (UnknownHostException) e.getCause();
+              }
             }
           }
+          response.setNMTokensFromPreviousAttempts(nmTokens);
+          LOG.info("Application " + appID + " retrieved "
+              + transferredContainers.size() + " containers from previous"
+              + " attempts and " + nmTokens.size() + " NM tokens.");
         }
-        response.setNMTokensFromPreviousAttempts(nmTokens);
-        LOG.info("Application " + appID + " retrieved "
-            + transferredContainers.size() + " containers from previous"
-            + " attempts and " + nmTokens.size() + " NM tokens.");
       }
 
       response.setSchedulerResourceTypes(rScheduler

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -21,12 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -67,6 +70,8 @@ import com.google.common.util.concurrent.SettableFuture;
 
 
 @SuppressWarnings("unchecked")
+@Private
+@Unstable
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
     extends AbstractService implements ResourceScheduler {
@@ -91,7 +96,12 @@ public abstract class AbstractYarnScheduler
   private long configuredMaximumAllocationWaitTime;
 
   protected RMContext rmContext;
-  protected Map<ApplicationId, SchedulerApplication<T>> applications;
+  
+  /*
+   * All schedulers which are inheriting AbstractYarnScheduler should use
+   * concurrent version of 'applications' map.
+   */
+  protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
   protected int nmExpireInterval;
 
   protected final static List<Container> EMPTY_CONTAINER_LIST =
@@ -123,7 +133,7 @@ public abstract class AbstractYarnScheduler
     super.serviceInit(conf);
   }
 
-  public synchronized List<Container> getTransferredContainers(
+  public List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
     SchedulerApplication<T> app = applications.get(appId);
@@ -132,6 +142,9 @@ public abstract class AbstractYarnScheduler
     if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
       return containerList;
     }
+    if (app == null) {
+      return containerList;
+    }
     Collection<RMContainer> liveContainers =
         app.getCurrentAppAttempt().getLiveContainers();
     ContainerId amContainerId =