Pārlūkot izejas kodu

YARN-376. Fixes a bug which would prevent the NM knowing about completed containers and applications. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1451475 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 gadi atpakaļ
vecāks
revīzija
2628c479a4

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -56,6 +56,9 @@ Release 0.23.7 - UNRELEASED
     YARN-426. Failure to download a public resource prevents further downloads
     (Jason Lowe via bobby)
 
+    YARN-376. Fixes a bug which would prevent the NM knowing about completed
+    containers and applications. (Jason Lowe via sseth)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -262,8 +262,7 @@ public class ResourceTrackerService extends AbstractService implements
     HeartbeatResponse latestResponse = recordFactory
         .newRecordInstance(HeartbeatResponse.class);
     latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
-    latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
-    latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
+    rmNode.updateHeartbeatResponseForCleanup(latestResponse);
     latestResponse.setNodeAction(NodeAction.NORMAL);
 
     // Check if node's masterKey needs to be updated and if the currentKey has

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -105,5 +105,12 @@ public interface RMNode {
 
   public List<ApplicationId> getAppsToCleanup();
 
+  /**
+   * Update a {@link HeartbeatResponse} with the list of containers and
+   * applications to clean up for this node.
+   * @param response the {@link HeartbeatResponse} to update
+   */
+  public void updateHeartbeatResponseForCleanup(HeartbeatResponse response);
+
   public HeartbeatResponse getLastHeartBeatResponse();
 }

+ 15 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -293,6 +293,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   };
 
+  @Override
+  public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
+    this.writeLock.lock();
+
+    try {
+      response.addAllContainersToCleanup(
+          new ArrayList<ContainerId>(this.containersToClean));
+      response.addAllApplicationsToCleanup(this.finishedApplications);
+      this.containersToClean.clear();
+      this.finishedApplications.clear();
+    } finally {
+      this.writeLock.unlock();
+    }
+  };
+
   @Override
   public HeartbeatResponse getLastHeartBeatResponse() {
 
@@ -539,12 +554,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
           statusEvent.getKeepAliveAppIds());
 
-      // HeartBeat processing from our end is done, as node pulls the following
-      // lists before sending status-updates. Clear data-structures
-      // TODO: These lists could go to the NM multiple times, or never.
-      rmNode.containersToClean.clear();
-      rmNode.finishedApplications.clear();
-
       return RMNodeState.RUNNING;
     }
   }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -183,6 +183,10 @@ public class MockNodes {
       return null;
     }
 
+    @Override
+    public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
+    }
+
     @Override
     public HeartbeatResponse getLastHeartBeatResponse() {
       return null;

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -29,6 +29,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -196,6 +199,39 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(RMNodeState.REBOOTED, node.getState());
   }
 
+  @Test(timeout=20000)
+  public void testUpdateHeartbeatResponseForCleanup() {
+    RMNodeImpl node = getRunningNode();
+    NodeId nodeId = node.getNodeID();
+
+    // Expire a container
+		ContainerId completedContainerId = BuilderUtils.newContainerId(
+				BuilderUtils.newApplicationAttemptId(
+						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
+    Assert.assertEquals(1, node.getContainersToCleanUp().size());
+
+    // Finish an application
+    ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
+    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+
+    // Verify status update does not clear containers/apps to cleanup
+    // but updating heartbeat response for cleanup does
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    node.handle(statusEvent);
+    Assert.assertEquals(1, node.getContainersToCleanUp().size());
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+    HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class);
+    node.updateHeartbeatResponseForCleanup(hbrsp);
+    Assert.assertEquals(0, node.getContainersToCleanUp().size());
+    Assert.assertEquals(0, node.getAppsToCleanup().size());
+    Assert.assertEquals(1, hbrsp.getContainersToCleanupCount());
+    Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0));
+    Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount());
+    Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0));
+  }
+
   private RMNodeImpl getRunningNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,