Browse Source

YARN-433. When RM is catching up with node updates then it should not expire acquired containers. Contributed by Xuan Gong

(cherry picked from commit ab80e277039a586f6d6259b2511ac413e29ea4f8)
Zhihai Xu 10 years ago
parent
commit
0e2019fa30

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -664,6 +664,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue 
     YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue 
     on nodelabel recovery. (Bibin A Chundatt via wangda)
     on nodelabel recovery. (Bibin A Chundatt via wangda)
 
 
+    YARN-433. When RM is catching up with node updates then it should not expire
+    acquired containers. (Xuan Gong via zxu)
+
 Release 2.7.2 - UNRELEASED
 Release 2.7.2 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 2 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -99,9 +99,9 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
 
     // Transitions from ACQUIRED state
     // Transitions from ACQUIRED state
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
-        RMContainerEventType.LAUNCHED, new LaunchedTransition())
+        RMContainerEventType.LAUNCHED)
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
-        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
+        RMContainerEventType.FINISHED, new FinishedTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -486,16 +486,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
     }
   }
   }
 
 
-  private static final class LaunchedTransition extends BaseTransition {
-
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container
-          .getContainerId());
-    }
-  }
-
   private static final class ContainerRescheduledTransition extends
   private static final class ContainerRescheduledTransition extends
       FinishedTransition {
       FinishedTransition {
 
 
@@ -554,19 +544,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
     }
   }
   }
 
 
-  private static final class ContainerFinishedAtAcquiredState extends
-      FinishedTransition {
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container
-          .getContainerId());
-
-      // Inform AppAttempt
-      super.transition(container, event);
-    }
-  }
-
   private static final class KillTransition extends FinishedTransition {
   private static final class KillTransition extends FinishedTransition {
 
 
     @Override
     @Override

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -107,6 +108,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private long lastHealthReportTime;
   private long lastHealthReportTime;
   private String nodeManagerVersion;
   private String nodeManagerVersion;
 
 
+  private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
   private final Set<ContainerId> launchedContainers =
     new HashSet<ContainerId>();
     new HashSet<ContainerId>();
@@ -265,6 +267,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     this.stateMachine = stateMachineFactory.make(this);
     this.stateMachine = stateMachineFactory.make(this);
     
     
     this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
     this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
+
+    this.containerAllocationExpirer = context.getContainerAllocationExpirer();
   }
   }
 
 
   @Override
   @Override
@@ -953,11 +957,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           // Just launched container. RM knows about it the first time.
           // Just launched container. RM knows about it the first time.
           launchedContainers.add(containerId);
           launchedContainers.add(containerId);
           newlyLaunchedContainers.add(remoteContainer);
           newlyLaunchedContainers.add(remoteContainer);
+          // Unregister from containerAllocationExpirer.
+          containerAllocationExpirer.unregister(containerId);
         }
         }
       } else {
       } else {
         // A finished container
         // A finished container
         launchedContainers.remove(containerId);
         launchedContainers.remove(containerId);
         completedContainers.add(remoteContainer);
         completedContainers.add(remoteContainer);
+        // Unregister from containerAllocationExpirer.
+        containerAllocationExpirer.unregister(containerId);
       }
       }
     }
     }
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {

+ 50 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -105,8 +107,9 @@ public class TestRMNodeTransitions {
     InlineDispatcher rmDispatcher = new InlineDispatcher();
     InlineDispatcher rmDispatcher = new InlineDispatcher();
     
     
     rmContext =
     rmContext =
-        new RMContextImpl(rmDispatcher, null, null, null,
-            mock(DelegationTokenRenewer.class), null, null, null, null, null);
+        new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
+          null, null, mock(DelegationTokenRenewer.class), null, null, null,
+          null, null);
     NodesListManager nodesListManager = mock(NodesListManager.class);
     NodesListManager nodesListManager = mock(NodesListManager.class);
     HostsFileReader reader = mock(HostsFileReader.class);
     HostsFileReader reader = mock(HostsFileReader.class);
     when(nodesListManager.getHostsReader()).thenReturn(reader);
     when(nodesListManager.getHostsReader()).thenReturn(reader);
@@ -147,7 +150,8 @@ public class TestRMNodeTransitions {
   public void tearDown() throws Exception {
   public void tearDown() throws Exception {
   }
   }
   
   
-  private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+  private RMNodeStatusEvent getMockRMNodeStatusEvent(
+      List<ContainerStatus> containerStatus) {
     NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
     NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
 
 
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
@@ -158,6 +162,9 @@ public class TestRMNodeTransitions {
     doReturn(healthStatus).when(event).getNodeHealthStatus();
     doReturn(healthStatus).when(event).getNodeHealthStatus();
     doReturn(response).when(event).getLatestResponse();
     doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    if (containerStatus != null) {
+      doReturn(containerStatus).when(event).getContainers();
+    }
     return event;
     return event;
   }
   }
   
   
@@ -176,7 +183,7 @@ public class TestRMNodeTransitions {
     
     
     // Now verify that scheduler isn't notified of an expired container
     // Now verify that scheduler isn't notified of an expired container
     // by checking number of 'completedContainers' it got in the previous event
     // by checking number of 'completedContainers' it got in the previous event
-    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
     ContainerStatus containerStatus = mock(ContainerStatus.class);
     ContainerStatus containerStatus = mock(ContainerStatus.class);
     doReturn(completedContainerId).when(containerStatus).getContainerId();
     doReturn(completedContainerId).when(containerStatus).getContainerId();
     doReturn(Collections.singletonList(containerStatus)).
     doReturn(Collections.singletonList(containerStatus)).
@@ -207,11 +214,11 @@ public class TestRMNodeTransitions {
     ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
     ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
         BuilderUtils.newApplicationAttemptId(
             BuilderUtils.newApplicationId(1, 1), 1), 2);
             BuilderUtils.newApplicationId(1, 1), 1), 2);
- 
-    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
-    
+
+    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);
+
     ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
@@ -263,8 +270,8 @@ public class TestRMNodeTransitions {
         BuilderUtils.newApplicationAttemptId(
         BuilderUtils.newApplicationAttemptId(
             BuilderUtils.newApplicationId(1, 1), 1), 1);
             BuilderUtils.newApplicationId(1, 1), 1), 1);
         
         
-    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);
 
 
     ContainerStatus containerStatus1 = mock(ContainerStatus.class);
     ContainerStatus containerStatus1 = mock(ContainerStatus.class);
     ContainerStatus containerStatus2 = mock(ContainerStatus.class);
     ContainerStatus containerStatus2 = mock(ContainerStatus.class);
@@ -499,7 +506,7 @@ public class TestRMNodeTransitions {
 
 
     // Verify status update does not clear containers/apps to cleanup
     // Verify status update does not clear containers/apps to cleanup
     // but updating heartbeat response for cleanup does
     // but updating heartbeat response for cleanup does
-    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
     node.handle(statusEvent);
     node.handle(statusEvent);
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     Assert.assertEquals(1, node.getAppsToCleanup().size());
     Assert.assertEquals(1, node.getAppsToCleanup().size());
@@ -706,4 +713,35 @@ public class TestRMNodeTransitions {
         null, null));
         null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
   }
+
+  @Test
+  public void testContainerExpire() throws Exception {
+    ContainerAllocationExpirer mockExpirer =
+        mock(ContainerAllocationExpirer.class);
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
+    mockExpirer.register(containerId1);
+    mockExpirer.register(containerId2);
+    verify(mockExpirer).register(containerId1);
+    verify(mockExpirer).register(containerId2);
+    ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
+    RMNodeImpl rmNode = getRunningNode();
+    ContainerStatus status1 =
+        ContainerStatus
+          .newInstance(containerId1, ContainerState.RUNNING, "", 0);
+    ContainerStatus status2 =
+        ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
+          0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status1);
+    statusList.add(status2);
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
+    rmNode.handle(statusEvent);
+    verify(mockExpirer).unregister(containerId1);
+    verify(mockExpirer).unregister(containerId2);
+  }
 }
 }