Browse Source

YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks)
(cherry picked from commit feaf0349949e831ce3f25814c1bbff52f17bfe8f)

Conflicts:

hadoop-yarn-project/CHANGES.txt

Jason Lowe 9 years ago
parent
commit
ac865de725

+ 3 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -143,6 +143,9 @@ public class NodeInfo {
       return null;
     }
 
+    public void resetLastNodeHeartBeatResponse() {
+    }
+
     public List<UpdatedContainerInfo> pullContainerUpdates() {
       ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>();
       

+ 5 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -129,6 +129,11 @@ public class RMNodeWrapper implements RMNode {
     return node.getLastNodeHeartBeatResponse();
   }
 
+  @Override
+  public void resetLastNodeHeartBeatResponse() {
+    node.getLastNodeHeartBeatResponse().setResponseId(0);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public List<UpdatedContainerInfo> pullContainerUpdates() {

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -42,6 +42,9 @@ Release 2.6.2 - UNRELEASED
     YARN-3194. RM should handle NMContainerStatuses sent by NM while
     registering if NM is Reconnected node (Rohith via jlowe)
 
+    YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id 
+    has not been reset synchronously. (Jun Gong via rohithsharmaks)
+
 Release 2.6.1 - 2015-09-23
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -312,6 +312,8 @@ public class ResourceTrackerService extends AbstractService implements
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
+      // Reset heartbeat ID since node just restarted.
+      oldNode.resetLastNodeHeartBeatResponse();
       this.rmContext
           .getDispatcher()
           .getEventHandler()

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -127,7 +127,12 @@ public interface RMNode {
   public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
 
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
-  
+
+  /**
+   * Reset lastNodeHeartbeatResponse's ID to 0.
+   */
+  void resetLastNodeHeartBeatResponse();
+
   /**
    * Get and clear the list of containerUpdates accumulated across NM
    * heartbeats.

+ 10 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -408,6 +408,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  @Override
+  public void resetLastNodeHeartBeatResponse() {
+    this.writeLock.lock();
+    try {
+      latestNodeHeartBeatResponse.setResponseId(0);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   public void handle(RMNodeEvent event) {
     LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
     try {
@@ -567,8 +577,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             new NodeRemovedSchedulerEvent(rmNode));
 
         if (rmNode.getHttpPort() == newNode.getHttpPort()) {
-          // Reset heartbeat ID since node just restarted.
-          rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
           if (!rmNode.getTotalCapability().equals(
               newNode.getTotalCapability())) {
             rmNode.totalCapability = newNode.getTotalCapability();
@@ -604,9 +612,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       
         handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
 
-        // Reset heartbeat ID since node just restarted.
-        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-
         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
           handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
         }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -185,6 +185,10 @@ public class MockNodes {
       return null;
     }
 
+    @Override
+    public void resetLastNodeHeartBeatResponse() {
+    }
+
     @Override
     public String getNodeManagerVersion() {
       return null;

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java

@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -189,4 +194,38 @@ public class TestNMReconnect {
     nlm.stop();
     scheduler.stop();
   }
+
+  @Test(timeout = 10000)
+  public void testRMNodeStatusAfterReconnect() throws Exception {
+    // The node(127.0.0.1:1234) reconnected with RM. When it registered with
+    // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But
+    // the node's heartbeat come before RM succeeded setting the id to 0.
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRM(){
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    int i = 0;
+    while(i < 3) {
+      nm1.nodeHeartbeat(true);
+      dispatcher.await();
+      i++;
+    }
+
+    MockNM nm2 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm2.registerNode();
+    RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    nm2.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING,
+        rmNode.getState());
+    rm.stop();
+  }
 }