浏览代码

YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers when nodes resync during work-preserving RM restart. Contributed by Jian He.
svn merge --ignore-ancestry -c 1610557 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1610559 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 10 年之前
父节点
当前提交
66af4bf54f

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -35,6 +35,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2088. Fixed a bug in GetApplicationsRequestPBImpl#mergeLocalToBuilder.
     (Binglin Chang via jianhe)
 
+    YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers
+    when nodes resync during work-preserving RM restart. (Jian He via vinodkv)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 19 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private String nodeManagerVersion;
 
   /* set of containers that have just launched */
-  private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
-    new HashMap<ContainerId, ContainerStatus>();
+  private final Set<ContainerId> launchedContainers =
+    new HashSet<ContainerId>();
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -476,6 +475,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
         containers = startEvent.getNMContainerStatuses();
+        if (containers != null && !containers.isEmpty()) {
+          for (NMContainerStatus container : containers) {
+            if (container.getContainerState() == ContainerState.RUNNING) {
+              rmNode.launchedContainers.add(container.getContainerId());
+            }
+          }
+        }
       }
       
       if (null != startEvent.getRunningApplications()) {
@@ -664,14 +670,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+          if (!rmNode.launchedContainers.contains(containerId)) {
             // Just launched container. RM knows about it the first time.
-            rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+            rmNode.launchedContainers.add(containerId);
             newlyLaunchedContainers.add(remoteContainer);
           }
         } else {
           // A finished container
-          rmNode.justLaunchedContainers.remove(containerId);
+          rmNode.launchedContainers.remove(containerId);
           completedContainers.add(remoteContainer);
         }
       }
@@ -748,4 +754,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public int getQueueSize() {
     return nodeUpdateQueue.size();
   }
+
+  // For test only.
+  @VisibleForTesting
+  public Set<ContainerId> getLaunchedContainers() {
+    return this.launchedContainers;
+  }
  }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -29,11 +29,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -164,6 +167,11 @@ public class TestWorkPreservingRMRestart {
 
     // Wait for RM to settle down on recovering containers;
     waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+    Set<ContainerId> launchedContainers =
+        ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+          .getLaunchedContainers();
+    assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+    assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
 
     // check RMContainers are re-recreated and the container state is correct.
     rm2.waitForState(nm1, amContainer.getContainerId(),