Преглед изворни кода

YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang

Jason Lowe пре 7 година
родитељ
комит
c98963b558
13 измењених фајлова са 120 додато и 127 уклоњено
  1. 2 2
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
  2. 2 2
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
  3. 2 9
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  4. 3 9
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  5. 48 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  6. 4 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  7. 10 38
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  8. 2 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
  9. 11 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
  10. 2 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  11. 1 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  12. 28 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  13. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

+ 2 - 2
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java

@@ -90,7 +90,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
           RecordFactoryProvider.getRecordFactory(null);
   // response queue
   protected final BlockingQueue<AllocateResponse> responseQueue;
-  protected int RESPONSE_ID = 1;
+  private int responseId = 0;
   // user name
   protected String user;  
   // queue name
@@ -213,7 +213,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerId> toRelease) {
     AllocateRequest allocateRequest =
             recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setResponseId(RESPONSE_ID ++);
+    allocateRequest.setResponseId(responseId++);
     allocateRequest.setAskList(ask);
     allocateRequest.setReleaseList(toRelease);
     return allocateRequest;

+ 2 - 2
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java

@@ -73,7 +73,7 @@ public class NMSimulator extends TaskRunner.Task {
   // resource manager
   private ResourceManager rm;
   // heart beat response id
-  private int RESPONSE_ID = 1;
+  private int responseId = 0;
   private final static Logger LOG = Logger.getLogger(NMSimulator.class);
   
   public void init(String nodeIdStr, int memory, int cores,
@@ -134,7 +134,7 @@ public class NMSimulator extends TaskRunner.Task {
     ns.setContainersStatuses(generateContainerStatusList());
     ns.setNodeId(node.getNodeID());
     ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
-    ns.setResponseId(RESPONSE_ID ++);
+    ns.setResponseId(responseId++);
     ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
     beatRequest.setNodeStatus(ns);
     NodeHeartbeatResponse beatResponse =

+ 2 - 9
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -143,8 +143,8 @@ public class NodeInfo {
       return runningApplications;
     }
 
-    public void updateNodeHeartbeatResponseForCleanup(
-            NodeHeartbeatResponse response) {
+    public void setAndUpdateNodeHeartbeatResponse(
+        NodeHeartbeatResponse response) {
     }
 
     public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@@ -177,13 +177,6 @@ public class NodeInfo {
       return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
 
-    @Override
-    public void updateNodeHeartbeatResponseForContainersDecreasing(
-        NodeHeartbeatResponse response) {
-      // TODO Auto-generated method stub
-      
-    }
-
     @Override
     public List<Container> pullNewlyIncreasedContainers() {
       // TODO Auto-generated method stub

+ 3 - 9
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -126,9 +126,9 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
-  public void updateNodeHeartbeatResponseForCleanup(
-          NodeHeartbeatResponse nodeHeartbeatResponse) {
-    node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
+  public void setAndUpdateNodeHeartbeatResponse(
+      NodeHeartbeatResponse nodeHeartbeatResponse) {
+    node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse);
   }
 
   @Override
@@ -166,12 +166,6 @@ public class RMNodeWrapper implements RMNode {
     return RMNodeLabelsManager.EMPTY_STRING_SET;
   }
 
-  @Override
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
-      NodeHeartbeatResponse response) {
-    // TODO Auto-generated method stub
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public List<Container> pullNewlyIncreasedContainers() {

+ 48 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -386,14 +388,37 @@ public class ResourceTrackerService extends AbstractService implements
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
-      // Reset heartbeat ID since node just restarted.
-      oldNode.resetLastNodeHeartBeatResponse();
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(
-              new RMNodeReconnectEvent(nodeId, rmNode, request
-                  .getRunningApplications(), request.getNMContainerStatuses()));
+
+      if (CollectionUtils.isEmpty(request.getRunningApplications())
+          && rmNode.getState() != NodeState.DECOMMISSIONING
+          && rmNode.getHttpPort() != oldNode.getHttpPort()) {
+        // Reconnected node differs, so replace old node and start new node
+        switch (rmNode.getState()) {
+        case RUNNING:
+          ClusterMetrics.getMetrics().decrNumActiveNodes();
+          break;
+        case UNHEALTHY:
+          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+          break;
+        default:
+          LOG.debug("Unexpected Rmnode state");
+        }
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new NodeRemovedSchedulerEvent(rmNode));
+
+        this.rmContext.getRMNodes().put(nodeId, rmNode);
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMNodeStartedEvent(nodeId, null, null));
+
+      } else {
+        // Reset heartbeat ID since node just restarted.
+        oldNode.resetLastNodeHeartBeatResponse();
+
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMNodeReconnectEvent(nodeId, rmNode,
+                request.getRunningApplications(),
+                request.getNMContainerStatuses()));
+      }
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.
@@ -490,12 +515,13 @@ public class ResourceTrackerService extends AbstractService implements
 
     // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
-    if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
-        .getResponseId()) {
+    if (getNextResponseId(
+        remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
+            .getResponseId()) {
       LOG.info("Received duplicate heartbeat from node "
           + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
       return lastNodeHeartbeatResponse;
-    } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
+    } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
         .getResponseId()) {
       String message =
           "Too far behind rm response id:"
@@ -510,13 +536,11 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     // Heartbeat response
-    NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
-        .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
-            getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
-            nextHeartBeatInterval);
-    rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
-    rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
-        nodeHeartBeatResponse);
+    NodeHeartbeatResponse nodeHeartBeatResponse =
+        YarnServerBuilderUtils.newNodeHeartbeatResponse(
+            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
+            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
+    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
 
     populateKeys(request, nodeHeartBeatResponse);
 
@@ -528,7 +552,7 @@ public class ResourceTrackerService extends AbstractService implements
 
     // 4. Send status to RMNode, saving the latest response.
     RMNodeStatusEvent nodeStatusEvent =
-        new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
+        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
     if (request.getLogAggregationReportsForApps() != null
         && !request.getLogAggregationReportsForApps().isEmpty()) {
       nodeStatusEvent.setLogAggregationReportsForApps(request
@@ -562,6 +586,11 @@ public class ResourceTrackerService extends AbstractService implements
     return nodeHeartBeatResponse;
   }
 
+  private int getNextResponseId(int responseId) {
+    // Loop between 0 and Integer.MAX_VALUE
+    return (responseId + 1) & Integer.MAX_VALUE;
+  }
+
   /**
    * Check if node in decommissioning state.
    * @param nodeId

+ 4 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -134,10 +134,11 @@ public interface RMNode {
 
   /**
    * Update a {@link NodeHeartbeatResponse} with the list of containers and
-   * applications to clean up for this node.
+   * applications to clean up for this node, and the containers to be updated.
+   *
    * @param response the {@link NodeHeartbeatResponse} to update
    */
-  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+  void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response);
 
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
 
@@ -160,13 +161,7 @@ public interface RMNode {
    * @return labels in this node
    */
   public Set<String> getNodeLabels();
-  
-  /**
-   * Update containers to be decreased
-   */
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
-      NodeHeartbeatResponse response);
-  
+
   public List<Container> pullNewlyIncreasedContainers();
 
   long getUntrackedTimeStamp();

+ 10 - 38
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -559,7 +559,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   };
 
   @Override
-  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+  public void setAndUpdateNodeHeartbeatResponse(
+      NodeHeartbeatResponse response) {
     this.writeLock.lock();
 
     try {
@@ -573,6 +574,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       this.finishedApplications.clear();
       this.containersToSignal.clear();
       this.containersToBeRemovedFromNM.clear();
+
+      // NOTE: This is required for backward compatibility.
+      response.addAllContainersToDecrease(toBeDecreasedContainers.values());
+      toBeDecreasedContainers.clear();
+
+      // Synchronously update the last response in rmNode with updated
+      // responseId
+      this.latestNodeHeartBeatResponse = response;
     } finally {
       this.writeLock.unlock();
     }
@@ -582,25 +591,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public Collection<Container> getToBeDecreasedContainers() {
     return toBeDecreasedContainers.values(); 
   }
-  
-  @Override
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
-      NodeHeartbeatResponse response) {
-    this.writeLock.lock();
-    
-    try {
-      response.addAllContainersToDecrease(toBeDecreasedContainers.values());
-      toBeDecreasedContainers.clear();
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
 
   @Override
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
-
     this.readLock.lock();
-
     try {
       return this.latestNodeHeartBeatResponse;
     } finally {
@@ -848,21 +842,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             rmNode.context.getDispatcher().getEventHandler().handle(
                 new NodeAddedSchedulerEvent(rmNode));
           }
-        } else {
-          // Reconnected node differs, so replace old node and start new node
-          switch (rmNode.getState()) {
-            case RUNNING:
-              ClusterMetrics.getMetrics().decrNumActiveNodes();
-              break;
-            case UNHEALTHY:
-              ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-              break;
-            default:
-              LOG.debug("Unexpected Rmnode state");
-            }
-            rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-            rmNode.context.getDispatcher().getEventHandler().handle(
-                new RMNodeStartedEvent(newNode.getNodeID(), null, null));
         }
 
       } else {
@@ -1118,10 +1097,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-
-      // Switch the last heartbeatresponse.
-      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
-
       NodeHealthStatus remoteNodeHealthStatus =
           statusEvent.getNodeHealthStatus();
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
@@ -1190,9 +1165,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     @Override
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event;
-
-      // Switch the last heartbeatresponse.
-      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
       NodeHealthStatus remoteNodeHealthStatus =
           statusEvent.getNodeHealthStatus();
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());

+ 2 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java

@@ -27,27 +27,22 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
 public class RMNodeStatusEvent extends RMNodeEvent {
 
   private final NodeStatus nodeStatus;
-  private final NodeHeartbeatResponse latestResponse;
   private List<LogAggregationReport> logAggregationReportsForApps;
 
-  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
-      NodeHeartbeatResponse latestResponse) {
-    this(nodeId, nodeStatus, latestResponse, null);
+  public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) {
+    this(nodeId, nodeStatus, null);
   }
 
   public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
-      NodeHeartbeatResponse latestResponse,
       List<LogAggregationReport> logAggregationReportsForApps) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeStatus = nodeStatus;
-    this.latestResponse = latestResponse;
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 
@@ -59,10 +54,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getContainersStatuses();
   }
 
-  public NodeHeartbeatResponse getLatestResponse() {
-    return this.latestResponse;
-  }
-  
   public List<ApplicationId> getKeepAliveAppIds() {
     return this.nodeStatus.getKeepAliveApplications();
   }

+ 11 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -116,7 +116,7 @@ public class MockNM {
             container.getResource());
     List<Container> increasedConts = Collections.singletonList(container);
     nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
-        true, ++responseId);
+        true, responseId);
   }
 
   public RegisterNodeManagerResponse registerNode() throws Exception {
@@ -161,12 +161,13 @@ public class MockNM {
       memory = newResource.getMemory();
       vCores = newResource.getVirtualCores();
     }
+    responseId = 0;
     return registrationResponse;
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
     return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
-        Collections.<Container>emptyList(), isHealthy, ++responseId);
+        Collections.<Container>emptyList(), isHealthy, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
@@ -179,12 +180,12 @@ public class MockNM {
     containerStatusList.add(containerStatus);
     Log.info("ContainerStatus: " + containerStatus);
     return nodeHeartbeat(containerStatusList,
-        Collections.<Container>emptyList(), true, ++responseId);
+        Collections.<Container>emptyList(), true, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
-    return nodeHeartbeat(conts, isHealthy, ++responseId);
+    return nodeHeartbeat(conts, isHealthy, responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -227,7 +228,8 @@ public class MockNM {
     req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
     NodeHeartbeatResponse heartbeatResponse =
         resourceTracker.nodeHeartbeat(req);
-    
+    responseId = heartbeatResponse.getResponseId();
+
     MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
     if (masterKeyFromRM != null
         && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
@@ -262,4 +264,8 @@ public class MockNM {
   public String getVersion() {
     return version;
   }
+
+  public void setResponseId(int id) {
+    this.responseId = id;
+  }
 }

+ 2 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -200,7 +200,8 @@ public class MockNodes {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+    public void setAndUpdateNodeHeartbeatResponse(
+        NodeHeartbeatResponse response) {
     }
 
     @Override
@@ -240,12 +241,6 @@ public class MockNodes {
       return CommonNodeLabelsManager.EMPTY_STRING_SET;
     }
 
-    @Override
-    public void updateNodeHeartbeatResponseForContainersDecreasing(
-        NodeHeartbeatResponse response) {
-      
-    }
-
     @Override
     public List<Container> pullNewlyIncreasedContainers() {
       return Collections.emptyList();

+ 1 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -158,15 +158,12 @@ public class TestRMNodeTransitions {
   
   private RMNodeStatusEvent getMockRMNodeStatusEvent(
       List<ContainerStatus> containerStatus) {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
     
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     if (containerStatus != null) {
       doReturn(containerStatus).when(event).getContainers();
@@ -175,15 +172,12 @@ public class TestRMNodeTransitions {
   }
   
   private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
 
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
     return event;
@@ -196,15 +190,12 @@ public class TestRMNodeTransitions {
   }
 
   private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
-    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
-
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
     doReturn(yes).when(healthStatus).getIsNodeHealthy();
 
     RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
     doReturn(healthStatus).when(event).getNodeHealthStatus();
-    doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
     doReturn(null).when(event).getKeepAliveAppIds();
     return event;
@@ -651,7 +642,7 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     Assert.assertEquals(1, node.getAppsToCleanup().size());
     NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
-    node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+    node.setAndUpdateNodeHeartbeatResponse(hbrsp);
     Assert.assertEquals(0, node.getContainersToCleanUp().size());
     Assert.assertEquals(0, node.getAppsToCleanup().size());
     Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());

+ 28 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -706,7 +706,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         Records.newRecord(NodeHeartbeatRequest.class);
     heartbeatReq.setNodeLabels(null); // Node heartbeat label update
     nodeStatusObject = getNodeStatusObject(nodeId);
-    nodeStatusObject.setResponseId(responseId+2);
+    nodeStatusObject.setResponseId(responseId+1);
     heartbeatReq.setNodeStatus(nodeStatusObject);
     heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
         .getNMTokenMasterKey());
@@ -1908,4 +1908,31 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
       DefaultMetricsSystem.shutdown();
     }
   }
+
+  @Test
+  public void testResponseIdOverflow() throws Exception {
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+
+    // prepare the responseId that's about to overflow
+    RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE);
+
+    nm1.setResponseId(Integer.MAX_VALUE);
+
+    // heartbeat twice and check responseId
+    nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+    Assert.assertEquals(0, nodeHeartbeat.getResponseId());
+
+    nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
+    Assert.assertEquals(1, nodeHeartbeat.getResponseId());
+  }
 }

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

@@ -167,7 +167,7 @@ public class TestRMAppLogAggregationStatus {
     NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
         new ArrayList<ContainerStatus>(), null,
         NodeHealthStatus.newInstance(true, null, 0), null, null, null);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp));
 
     List<LogAggregationReport> node2ReportForApp =
@@ -181,7 +181,7 @@ public class TestRMAppLogAggregationStatus {
     NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
         new ArrayList<ContainerStatus>(), null,
         NodeHealthStatus.newInstance(true, null, 0), null, null, null);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
         node2ReportForApp));
     // node1 and node2 has updated its log aggregation status
     // verify that the log aggregation status for node1, node2
@@ -218,7 +218,7 @@ public class TestRMAppLogAggregationStatus {
         LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode1_2);
     node1ReportForApp2.add(report1_2);
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp2));
 
     // verify that the log aggregation status for node1
@@ -286,7 +286,7 @@ public class TestRMAppLogAggregationStatus {
       LogAggregationStatus.SUCCEEDED, ""));
     // For every logAggregationReport cached in memory, we can only save at most
     // 10 diagnostic messages/failure messages
-    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
         node1ReportForApp3));
 
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();
@@ -330,7 +330,7 @@ public class TestRMAppLogAggregationStatus {
           LogAggregationStatus.FAILED, "");
     node2ReportForApp2.add(report2_2);
     node2ReportForApp2.add(report2_3);
-    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
         node2ReportForApp2));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());