Browse Source

Merge r1580078 from branch-2. YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1580081 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 years ago
parent
commit
9254bfc221

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -486,6 +486,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
     YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
     more log-data than the log-length that it records. (Mit Desai via vinodk)
     more log-data than the log-length that it records. (Mit Desai via vinodk)
 
 
+    YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM
+    (Karthik Kambatla via jianhe )
+
 Release 2.3.1 - UNRELEASED
 Release 2.3.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 42 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -187,12 +188,51 @@ public class ResourceTrackerService extends AbstractService implements
     super.serviceStop();
     super.serviceStop();
   }
   }
 
 
+  /**
+   * Helper method to handle received ContainerStatus. If this corresponds to
+   * the completion of a master-container of a managed AM,
+   * we call the handler for RMAppAttemptContainerFinishedEvent.
+   */
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  void handleContainerStatus(ContainerStatus containerStatus) {
+    ApplicationAttemptId appAttemptId =
+        containerStatus.getContainerId().getApplicationAttemptId();
+    RMApp rmApp =
+        rmContext.getRMApps().get(appAttemptId.getApplicationId());
+    if (rmApp == null) {
+      LOG.error("Received finished container : "
+          + containerStatus.getContainerId()
+          + "for unknown application " + appAttemptId.getApplicationId()
+          + " Skipping.");
+      return;
+    }
+
+    if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring container completion status for unmanaged AM"
+            + rmApp.getApplicationId());
+      }
+      return;
+    }
+
+    RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+    Container masterContainer = rmAppAttempt.getMasterContainer();
+    if (masterContainer.getId().equals(containerStatus.getContainerId())
+        && containerStatus.getState() == ContainerState.COMPLETE) {
+      // sending master container finished event.
+      RMAppAttemptContainerFinishedEvent evt =
+          new RMAppAttemptContainerFinishedEvent(appAttemptId,
+              containerStatus);
+      rmContext.getDispatcher().getEventHandler().handle(evt);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   @Override
   @Override
   public RegisterNodeManagerResponse registerNodeManager(
   public RegisterNodeManagerResponse registerNodeManager(
       RegisterNodeManagerRequest request) throws YarnException,
       RegisterNodeManagerRequest request) throws YarnException,
       IOException {
       IOException {
-
     NodeId nodeId = request.getNodeId();
     NodeId nodeId = request.getNodeId();
     String host = nodeId.getHost();
     String host = nodeId.getHost();
     int cmPort = nodeId.getPort();
     int cmPort = nodeId.getPort();
@@ -204,29 +244,7 @@ public class ResourceTrackerService extends AbstractService implements
       LOG.info("received container statuses on node manager register :"
       LOG.info("received container statuses on node manager register :"
           + request.getContainerStatuses());
           + request.getContainerStatuses());
       for (ContainerStatus containerStatus : request.getContainerStatuses()) {
       for (ContainerStatus containerStatus : request.getContainerStatuses()) {
-        ApplicationAttemptId appAttemptId =
-            containerStatus.getContainerId().getApplicationAttemptId();
-        RMApp rmApp =
-            rmContext.getRMApps().get(appAttemptId.getApplicationId());
-        if (rmApp != null) {
-          RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
-          if (rmAppAttempt != null) {
-            if (rmAppAttempt.getMasterContainer().getId()
-                .equals(containerStatus.getContainerId())
-                && containerStatus.getState() == ContainerState.COMPLETE) {
-              // sending master container finished event.
-              RMAppAttemptContainerFinishedEvent evt =
-                  new RMAppAttemptContainerFinishedEvent(appAttemptId,
-                      containerStatus);
-              rmContext.getDispatcher().getEventHandler().handle(evt);
-            }
-          }
-        } else {
-          LOG.error("Received finished container :"
-              + containerStatus.getContainerId()
-              + " for non existing application :"
-              + appAttemptId.getApplicationId());
-        }
+        handleContainerStatus(containerStatus);
       }
       }
     }
     }
     RegisterNodeManagerResponse response = recordFactory
     RegisterNodeManagerResponse response = recordFactory

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -35,9 +35,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKey;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -629,7 +631,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
     }
   }
   }
 
 
-  private void setMasterContainer(Container container) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void setMasterContainer(Container container) {
     masterContainer = container;
     masterContainer = container;
   }
   }
 
 

+ 58 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -45,21 +45,29 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
+
 import org.junit.After;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 
 public class TestResourceTrackerService {
 public class TestResourceTrackerService {
 
 
@@ -468,26 +476,64 @@ public class TestResourceTrackerService {
         ClusterMetrics.getMetrics().getUnhealthyNMs());
         ClusterMetrics.getMetrics().getUnhealthyNMs());
   }
   }
 
 
+  @SuppressWarnings("unchecked")
   @Test
   @Test
-  public void testNodeRegistrationWithContainers() throws Exception {
-    rm = new MockRM();
-    rm.init(new YarnConfiguration());
+  public void testHandleContainerStatusInvalidCompletions() throws Exception {
+    rm = new MockRM(new YarnConfiguration());
     rm.start();
     rm.start();
-    RMApp app = rm.submitApp(1024);
 
 
-    MockNM nm = rm.registerNode("host1:1234", 8192);
-    nm.nodeHeartbeat(true);
+    EventHandler handler =
+        spy(rm.getRMContext().getDispatcher().getEventHandler());
+
+    // Case 1: Unmanaged AM
+    RMApp app = rm.submitApp(1024, true);
 
 
-    // Register node with some container statuses
+    // Case 1.1: AppAttemptId is null
     ContainerStatus status = ContainerStatus.newInstance(
     ContainerStatus status = ContainerStatus.newInstance(
         ContainerId.newInstance(ApplicationAttemptId.newInstance(
         ContainerId.newInstance(ApplicationAttemptId.newInstance(
             app.getApplicationId(), 2), 1),
             app.getApplicationId(), 2), 1),
         ContainerState.COMPLETE, "Dummy Completed", 0);
         ContainerState.COMPLETE, "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleContainerStatus(status);
+    verify(handler, never()).handle((Event) any());
+
+    // Case 1.2: Master container is null
+    RMAppAttemptImpl currentAttempt =
+        (RMAppAttemptImpl) app.getCurrentAppAttempt();
+    currentAttempt.setMasterContainer(null);
+    status = ContainerStatus.newInstance(
+        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+        ContainerState.COMPLETE, "Dummy Completed", 0);
+    rm.getResourceTrackerService().handleContainerStatus(status);
+    verify(handler, never()).handle((Event)any());
 
 
-    // The following shouldn't throw NPE
-    nm.registerNode(Collections.singletonList(status));
-    assertEquals("Incorrect number of nodes", 1,
-        rm.getRMContext().getRMNodes().size());
+    // Case 2: Managed AM
+    app = rm.submitApp(1024);
+
+    // Case 2.1: AppAttemptId is null
+    status = ContainerStatus.newInstance(
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(
+            app.getApplicationId(), 2), 1),
+        ContainerState.COMPLETE, "Dummy Completed", 0);
+    try {
+      rm.getResourceTrackerService().handleContainerStatus(status);
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    verify(handler, never()).handle((Event)any());
+
+    // Case 2.2: Master container is null
+    currentAttempt =
+        (RMAppAttemptImpl) app.getCurrentAppAttempt();
+    currentAttempt.setMasterContainer(null);
+    status = ContainerStatus.newInstance(
+        ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
+        ContainerState.COMPLETE, "Dummy Completed", 0);
+    try {
+      rm.getResourceTrackerService().handleContainerStatus(status);
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    verify(handler, never()).handle((Event)any());
   }
   }
 
 
   @Test
   @Test