Parcourir la source

Reverting YARN-245 to fix a critical bug.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1508279 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli il y a 11 ans
Parent
commit
b8b0693d2b

+ 0 - 3
hadoop-yarn-project/CHANGES.txt

@@ -735,9 +735,6 @@ Release 2.1.0-beta - 2013-07-02
 
     YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
 
-    YARN-245. Fixed NodeManager to handle duplicate responses from
-    ResourceManager. (Mayank Bansal via vinodkv)
-
     YARN-932. TestResourceLocalizationService.testLocalizationInit can fail on
     JDK7. (Karthik Kambatla via Sandy Ryza)
 

+ 1 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
@@ -159,7 +158,7 @@ public class NodeManager extends CompositeService
     addService(del);
 
     // NodeManager level dispatcher
-    this.dispatcher = (AsyncDispatcher) createDispatcher();
+    this.dispatcher = new AsyncDispatcher();
 
     nodeHealthChecker = new NodeHealthCheckerService();
     addService(nodeHealthChecker);
@@ -204,16 +203,6 @@ public class NodeManager extends CompositeService
     // TODO add local dirs to del
   }
 
-  @Private
-  protected Dispatcher createDispatcher(){
-    return new AsyncDispatcher();
-  }
-  
-  @Private
-  public Dispatcher getDispatcher(){
-    return this.dispatcher;
-  }
-  
   @Override
   protected void serviceStart() throws Exception {
     try {

+ 1 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -369,13 +369,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
                 .getNMTokenSecretManager().getCurrentKey());
             response = resourceTracker.nodeHeartbeat(request);
-            // Checking if the response id is the same which we just processed
-            // If yes then ignore the update.
-            if (lastHeartBeatID != response.getResponseId() - 1) {
-              LOG.info("Discarding the duplicate response "
-                  + response.getResponseId());
-              continue;
-            }
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
@@ -402,6 +395,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
               break;
             }
+
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();

+ 6 - 132
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -433,26 +431,6 @@ public class TestNodeStatusUpdater {
     }
   }
   
-  private class MyNodeManager7 extends NodeManager {
-    private ResourceTracker resourceTracker;
-    private MyNodeStatusUpdater3 nodeStatusUpdater;
-
-    @Override
-    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      this.nodeStatusUpdater =
-          new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics);
-      resourceTracker = new MyResourceTracker7(context);
-      this.nodeStatusUpdater.resourceTracker = resourceTracker;
-
-      return this.nodeStatusUpdater;
-    }
-
-    protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
-      return this.nodeStatusUpdater;
-    }
-  }
-
   private class MyNodeManager2 extends NodeManager {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
@@ -574,68 +552,6 @@ public class TestNodeStatusUpdater {
     }
   }
 
-  private class MyResourceTracker7 implements ResourceTracker {
-    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
-    public NodeAction registerNodeAction = NodeAction.NORMAL;
-    private final Context context;
-    private int lastRequestedHeartBeat = 0;
-    private boolean gotDuplicateHeartBeatRequest = false;
-    private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
-
-    MyResourceTracker7(Context context) {
-      this.context = context;
-    }
-
-    @Override
-    public RegisterNodeManagerResponse registerNodeManager(
-        RegisterNodeManagerRequest request) throws YarnException, IOException {
-      RegisterNodeManagerResponse response =
-          recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
-      response.setNodeAction(registerNodeAction);
-      response.setContainerTokenMasterKey(createMasterKey());
-      response.setNMTokenMasterKey(createMasterKey());
-      return response;
-    }
-
-    @Override
-    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
-        throws YarnException, IOException {
-
-      if (lastRequestedHeartBeat != 0
-          && lastRequestedHeartBeat == request.getNodeStatus().getResponseId()) {
-        LOG.info("GOT Duplicate heartbeatId "
-            + request.getNodeStatus().getResponseId());
-        gotDuplicateHeartBeatRequest = true;
-      }
-      lastRequestedHeartBeat = request.getNodeStatus().getResponseId();
-      LOG.info("Got heartBeatId: [" + heartBeatID + "]");
-      NodeStatus nodeStatus = request.getNodeStatus();
-      nodeStatus.setResponseId(heartBeatID++);
-      NodeHeartbeatResponse nhResponse =
-          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
-            heartBeatNodeAction, null, null, null, null, 1000L);
-
-      if (heartBeatID == 5) {
-        LOG.info("Sending FINISH_APP for application: [" + appId + "]");
-        this.context.getApplications().put(appId, mock(Application.class));
-        nhResponse
-          .addAllApplicationsToCleanup(Collections.singletonList(appId));
-      }
-      if (heartBeatID == 6) {
-        nhResponse.setResponseId(5);
-        LOG.info("Sending FINISH_APP for application: [" + appId + "]");
-        this.context.getApplications().put(appId, mock(Application.class));
-        nhResponse
-          .addAllApplicationsToCleanup(Collections.singletonList(appId));
-      }
-      return nhResponse;
-    }
-
-    public boolean isGotDuplicateHeartBeatRequest() {
-      return gotDuplicateHeartBeatRequest;
-    }
-  }
-  
   private class MyResourceTracker4 implements ResourceTracker {
 
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -829,7 +745,7 @@ public class TestNodeStatusUpdater {
     lfs.delete(new Path(basedir.getPath()), true);
   }
 
-  @Test(timeout = 60000)
+  @Test
   public void testNMRegistration() throws InterruptedException {
     nm = new NodeManager() {
       @Override
@@ -889,7 +805,7 @@ public class TestNodeStatusUpdater {
     nm.stop();
   }
   
-  @Test(timeout = 60000)
+  @Test
   public void testStopReentrant() throws Exception {
     final AtomicInteger numCleanups = new AtomicInteger(0);
     nm = new NodeManager() {
@@ -935,49 +851,7 @@ public class TestNodeStatusUpdater {
     Assert.assertEquals(numCleanups.get(), 1);
   }
 
-  @SuppressWarnings("rawtypes")
-  class MyDispatcher7 extends AsyncDispatcher {
-    public volatile int finishapp_event;
-
-    protected void dispatch(Event event) {
-      if (event.getType().name()
-        .equals(ContainerManagerEventType.FINISH_APPS.toString())) {
-        ++finishapp_event;
-      }
-    }
-  }
-
-  @Test(timeout = 60000)
-  public void testDuplicateResponseFromRM() throws Exception {
-    MyNodeManager7 nm = new MyNodeManager7() {
-      protected Dispatcher createDispatcher() {
-        return new MyDispatcher7();
-      }
-    };
-    try {
-      YarnConfiguration conf = createNMConfig();
-      conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l);
-      nm.init(conf);
-      nm.start();
-      MyResourceTracker7 rt =
-          (MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient();
-      while (heartBeatID < 7) {
-        Thread.sleep(1000l);
-      }
-      Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest());
-
-      MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher();
-      // We are sending two FINISH_APPS in heartbeat 5 and 6
-      // Checking we get only one time FINISH_APPS event which is the first one
-      Assert.assertEquals(1, nmdispatcher.finishapp_event);
-
-    } finally {
-      if (nm.getServiceState() == STATE.STARTED)
-        nm.stop();
-    }
-  }
-  
-  @Test(timeout = 60000)
+  @Test
   public void testNodeDecommision() throws Exception {
     nm = getNodeManager(NodeAction.SHUTDOWN);
     YarnConfiguration conf = createNMConfig();
@@ -1024,7 +898,7 @@ public class TestNodeStatusUpdater {
                                                        NodeHealthCheckerService healthChecker);
   }
 
-  @Test(timeout = 60000)
+  @Test
   public void testNMShutdownForRegistrationFailure() throws Exception {
 
     nm = new NodeManagerWithCustomNodeStatusUpdater() {
@@ -1137,7 +1011,7 @@ public class TestNodeStatusUpdater {
    * started properly, RM will think that the NM is alive and will retire the NM
    * only after NM_EXPIRY interval. See MAPREDUCE-2749.
    */
-  @Test(timeout = 60000)
+  @Test
   public void testNoRegistrationWhenNMServicesFail() throws Exception {
 
     nm = new NodeManager() {
@@ -1168,7 +1042,7 @@ public class TestNodeStatusUpdater {
     verifyNodeStartFailure("Starting of RPC Server failed");
   }
 
-  @Test(timeout = 60000)
+  @Test
   public void testApplicationKeepAlive() throws Exception {
     MyNodeManager nm = new MyNodeManager();
     try {