瀏覽代碼

YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1464105 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 年之前
父節點
當前提交
3e9200ddde

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -183,6 +183,9 @@ Release 2.0.5-beta - UNRELEASED
     local directory hits unix file count limits and thus prevent job failures.
     local directory hits unix file count limits and thus prevent job failures.
     (Omkar Vinit Joshi via vinodkv)
     (Omkar Vinit Joshi via vinodkv)
 
 
+    YARN-101. Fix NodeManager heartbeat processing to not lose track of completed
+    containers in case of dropped heartbeats. (Xuan Gong via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 Release 2.0.4-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -119,6 +119,10 @@ public class NodeManager extends CompositeService
     return new DeletionService(exec);
     return new DeletionService(exec);
   }
   }
 
 
+  protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
+    return new NMContext(containerTokenSecretManager);
+  }
+
   protected void doSecureLogin() throws IOException {
   protected void doSecureLogin() throws IOException {
     SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
     SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
         YarnConfiguration.NM_PRINCIPAL);
         YarnConfiguration.NM_PRINCIPAL);
@@ -137,7 +141,7 @@ public class NodeManager extends CompositeService
       containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
       containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
     }
     }
 
 
-    this.context = new NMContext(containerTokenSecretManager);
+    this.context = createNMContext(containerTokenSecretManager);
 
 
     this.aclsManager = new ApplicationACLsManager(conf);
     this.aclsManager = new ApplicationACLsManager(conf);
 
 

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private final NodeHealthCheckerService healthChecker;
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
   private final NodeManagerMetrics metrics;
 
 
+  private boolean previousHeartBeatSucceeded;
+  private List<ContainerStatus> previousContainersStatuses =
+      new ArrayList<ContainerStatus>();
+
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
     super(NodeStatusUpdaterImpl.class.getName());
     super(NodeStatusUpdaterImpl.class.getName());
@@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.context = context;
     this.context = context;
     this.dispatcher = dispatcher;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
     this.metrics = metrics;
+    this.previousHeartBeatSucceeded = true;
   }
   }
 
 
   @Override
   @Override
@@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     nodeStatus.setNodeId(this.nodeId);
     nodeStatus.setNodeId(this.nodeId);
 
 
-    int numActiveContainers = 0;
     List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
     List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+    if(previousHeartBeatSucceeded) {
+      previousContainersStatuses.clear();
+    } else {
+      containersStatuses.addAll(previousContainersStatuses);
+    }
+
+    int numActiveContainers = 0;
     for (Iterator<Entry<ContainerId, Container>> i =
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
       Entry<ContainerId, Container> e = i.next();
@@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       LOG.info("Sending out status for container: " + containerStatus);
       LOG.info("Sending out status for container: " + containerStatus);
 
 
       if (containerStatus.getState() == ContainerState.COMPLETE) {
       if (containerStatus.getState() == ContainerState.COMPLETE) {
+        previousContainersStatuses.add(containerStatus);
         // Remove
         // Remove
         i.remove();
         i.remove();
 
 
@@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             }
             }
             NodeHeartbeatResponse response =
             NodeHeartbeatResponse response =
               resourceTracker.nodeHeartbeat(request);
               resourceTracker.nodeHeartbeat(request);
+            previousHeartBeatSucceeded = true;
             //get next heartbeat interval from response
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             // See if the master-key has rolled over
             // See if the master-key has rolled over
@@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   new CMgrCompletedAppsEvent(appsToCleanup));
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
             }
           } catch (Throwable e) {
           } catch (Throwable e) {
+            previousHeartBeatSucceeded = false;
             // TODO Better error handling. Thread can die with the rest of the
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             // NM still running.
             LOG.error("Caught exception in status-updater", e);
             LOG.error("Caught exception in status-updater", e);

+ 242 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 package org.apache.hadoop.yarn.server.nodemanager;
 
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service;
@@ -92,6 +97,8 @@ public class TestNodeStatusUpdater {
   private final Configuration conf = createNMConfig();
   private final Configuration conf = createNMConfig();
   private NodeManager nm;
   private NodeManager nm;
   protected NodeManager rebootedNodeManager;
   protected NodeManager rebootedNodeManager;
+  private boolean containerStatusBackupSuccessfully = true;
+  private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
 
 
   @After
   @After
   public void tearDown() {
   public void tearDown() {
@@ -237,6 +244,22 @@ public class TestNodeStatusUpdater {
     }
     }
   }
   }
 
 
+  private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker4(context);
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+
+  }
+
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
     public ResourceTracker resourceTracker;
     public ResourceTracker resourceTracker;
     private Context context;
     private Context context;
@@ -384,6 +407,104 @@ public class TestNodeStatusUpdater {
     }
     }
   }
   }
 
 
+  private class MyResourceTracker4 implements ResourceTracker {
+
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    private Context context;
+
+    public MyResourceTracker4(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction);
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      try {
+        if (heartBeatID == 0) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 0);
+          Assert.assertEquals(context.getContainers().size(), 0);
+        } else if (heartBeatID == 1) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          throw new YarnException("Lost the heartbeat response");
+        } else if (heartBeatID == 2) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 7);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(5).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(5)
+                  .getContainerId().getId() == 6);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(6).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(6)
+                  .getContainerId().getId() == 7);
+        }
+      } catch (AssertionError error) {
+        LOG.info(error);
+        containerStatusBackupSuccessfully = false;
+      } finally {
+        heartBeatID++;
+      }
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID);
+      NodeHeartbeatResponse nhResponse =
+          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+              heartBeatNodeAction, null, null, null, 1000L);
+      return nhResponse;
+    }
+  }
+
   @Before
   @Before
   public void clearError() {
   public void clearError() {
     nmStartError = null;
     nmStartError = null;
@@ -725,6 +846,127 @@ public class TestNodeStatusUpdater {
     }
     }
   }
   }
 
 
+  /**
+   * Test completed containerStatus get back up when heart beat lost
+   */
+  @Test(timeout = 20000)
+  public void testCompletedContainerStatusBackup() throws Exception {
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MyNodeStatusUpdater2 myNodeStatusUpdater =
+            new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
+                metrics);
+        return myNodeStatusUpdater;
+      }
+
+      @Override
+      protected NMContext createNMContext(
+          NMContainerTokenSecretManager containerTokenSecretManager) {
+        return new MyNMContext(containerTokenSecretManager);
+      }
+
+    };
+
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID <= 3 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    if(!containerStatusBackupSuccessfully) {
+      Assert.fail("ContainerStatus Backup failed");
+    }
+    nm.stop();
+  }
+
+  private class MyNMContext extends NMContext {
+    ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentSkipListMap<ContainerId, Container>();
+
+    public MyNMContext(NMContainerTokenSecretManager
+        containerTokenSecretManager) {
+      super(containerTokenSecretManager);
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      if (heartBeatID == 0) {
+        return containers;
+      } else if (heartBeatID == 1) {
+        ContainerStatus containerStatus1 =
+            createContainerStatus(1, ContainerState.RUNNING);
+        Container container1 = getMockContainer(containerStatus1);
+        containers.put(containerStatus1.getContainerId(), container1);
+
+        ContainerStatus containerStatus2 =
+            createContainerStatus(2, ContainerState.RUNNING);
+        Container container2 = getMockContainer(containerStatus2);
+        containers.put(containerStatus2.getContainerId(), container2);
+
+        ContainerStatus containerStatus3 =
+            createContainerStatus(3, ContainerState.COMPLETE);
+        Container container3 = getMockContainer(containerStatus3);
+        containers.put(containerStatus3.getContainerId(), container3);
+        completedContainerStatusList.add(containerStatus3);
+
+        ContainerStatus containerStatus4 =
+            createContainerStatus(4, ContainerState.COMPLETE);
+        Container container4 = getMockContainer(containerStatus4);
+        containers.put(containerStatus4.getContainerId(), container4);
+        completedContainerStatusList.add(containerStatus4);
+
+        ContainerStatus containerStatus5 =
+            createContainerStatus(5, ContainerState.RUNNING);
+        Container container5 = getMockContainer(containerStatus5);
+        containers.put(containerStatus5.getContainerId(), container5);
+
+        return containers;
+      } else if (heartBeatID == 2) {
+        ContainerStatus containerStatus6 =
+            createContainerStatus(6, ContainerState.RUNNING);
+        Container container6 = getMockContainer(containerStatus6);
+        containers.put(containerStatus6.getContainerId(), container6);
+
+        ContainerStatus containerStatus7 =
+            createContainerStatus(7, ContainerState.COMPLETE);
+        Container container7 = getMockContainer(containerStatus7);
+        containers.put(containerStatus7.getContainerId(), container7);
+        completedContainerStatusList.add(containerStatus7);
+
+        return containers;
+      } else {
+        containers.clear();
+
+        return containers;
+      }
+    }
+
+    private ContainerStatus createContainerStatus(int id,
+        ContainerState containerState) {
+      ApplicationId applicationId =
+          BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
+      ApplicationAttemptId applicationAttemptId =
+          BuilderUtils.newApplicationAttemptId(applicationId, id);
+      ContainerId contaierId =
+          BuilderUtils.newContainerId(applicationAttemptId, id);
+      ContainerStatus containerStatus =
+          BuilderUtils.newContainerStatus(contaierId, containerState,
+              "test_containerStatus: id=" + id + ", containerState: "
+                  + containerState, 0);
+      return containerStatus;
+    }
+
+    private Container getMockContainer(ContainerStatus containerStatus) {
+      Container container = mock(Container.class);
+      when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+      return container;
+    }
+  }
+
   private void verifyNodeStartFailure(String errMessage) {
   private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
     nm.init(conf);