Sfoglia il codice sorgente

YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers launched by AMs running on the same machine as the AM are correctly propagated. Contributed by Jian He.
svn merge --ignore-ancestry -c 1578631 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1578633 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 anni fa
parent
commit
2da392cf3a

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -449,6 +449,10 @@ Release 2.4.0 - UNRELEASED
     manner and thus fix failure of TestResourceTrackerService. (Tsuyoshi Ozawa
     via vinodkv)
 
+    YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers
+    launched by AMs running on the same machine as the AM are correctly
+    propagated. (Jian He via vinodkv)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -294,13 +294,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     for (NMToken token : nmTokens) {
       String nodeId = token.getNodeId().toString();
       if (getNMTokenCache().containsToken(nodeId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Replacing token for : " + nodeId);
-        }
+        LOG.info("Replacing token for : " + nodeId);
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Received new token for : " + nodeId);
-        }
+        LOG.info("Received new token for : " + nodeId);
       }
       getNMTokenCache().setToken(nodeId, token.getToken());
     }

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -831,9 +831,18 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         appAttempt.retryFetchingAMContainer(appAttempt);
         return RMAppAttemptState.SCHEDULED;
       }
+
       // Set the masterContainer
       appAttempt.setMasterContainer(amContainerAllocation.getContainers()
         .get(0));
+      // The node set in NMTokenSecrentManager is used for marking whether the
+      // NMToken has been issued for this node to the AM.
+      // When AM container was allocated to RM itself, the node which allocates
+      // this AM container was marked as the NMToken already sent. Thus,
+      // clear this node set so that the following allocate requests from AM are
+      // able to retrieve the corresponding NMToken.
+      appAttempt.rmContext.getNMTokenSecretManager()
+        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
       appAttempt.getSubmissionContext().setResource(
         appAttempt.getMasterContainer().getResource());
       appAttempt.storeAttempt();

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -902,7 +902,8 @@ public class CapacityScheduler extends AbstractYarnScheduler
   }
 
   @Lock(Lock.NoLock.class)
-  FiCaSchedulerApp getApplicationAttempt(
+  @VisibleForTesting
+  public FiCaSchedulerApp getApplicationAttempt(
       ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication app =
         applications.get(applicationAttemptId.getApplicationId());

+ 16 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java

@@ -138,6 +138,19 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
     }
   }
 
+  public void clearNodeSetForAttempt(ApplicationAttemptId attemptId) {
+    super.writeLock.lock();
+    try {
+      HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(attemptId);
+      if (nodeSet != null) {
+        LOG.info("Clear node set for " + attemptId);
+        nodeSet.clear();
+      }
+    } finally {
+      super.writeLock.unlock();
+    }
+  }
+
   private void clearApplicationNMTokenKeys() {
     // We should clear all node entries from this set.
     // TODO : Once we have per node master key then it will change to only
@@ -184,22 +197,13 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
       NMToken nmToken = null;
       if (nodeSet != null) {
         if (!nodeSet.contains(container.getNodeId())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending NMToken for nodeId : "
-                + container.getNodeId().toString()
-                + " for application attempt : " + appAttemptId.toString());
-          }
+          LOG.info("Sending NMToken for nodeId : " + container.getNodeId()
+              + " for container : " + container.getId());
           Token token =
               createNMToken(container.getId().getApplicationAttemptId(),
                 container.getNodeId(), applicationSubmitter);
           nmToken = NMToken.newInstance(container.getNodeId(), token);
-          // The node set here is used for differentiating whether the NMToken
-          // has been issued for this node from the client's perspective. If
-          // this is an AM container, the NMToken is issued only for RM and so
-          // we should not update the node set.
-          if (container.getId().getId() != 1) {
-            nodeSet.add(container.getNodeId());
-          }
+          nodeSet.add(container.getNodeId());
         }
       }
       return nmToken;

+ 55 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -171,7 +172,60 @@ public class TestRM {
 
     rm.stop();
   }
-  
+
+  // Test even if AM container is allocated with containerId not equal to 1, the
+  // following allocate requests from AM should be able to retrieve the
+  // corresponding NM Token.
+  @Test (timeout = 20000)
+  public void testNMTokenSentForNormalContainer() throws Exception {
+
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 5120);
+    RMApp app = rm.submitApp(2000);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+    // Call getNewContainerId to increase container Id so that the AM container
+    // Id doesn't equal to one.
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    cs.getApplicationAttempt(attempt.getAppAttemptId()).getNewContainerId();
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    MockAM am = MockRM.launchAM(app, rm, nm1);
+    // am container Id not equal to 1.
+    Assert.assertTrue(attempt.getMasterContainer().getId().getId() != 1);
+    // NMSecretManager doesn't record the node on which the am is allocated.
+    Assert.assertFalse(rm.getRMNMTokenSecretManager()
+      .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+        nm1.getNodeId()));
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+
+    int NUM_CONTAINERS = 1;
+    List<Container> containers = new ArrayList<Container>();
+    // nmTokens keeps track of all the nmTokens issued in the allocate call.
+    List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
+
+    // am1 allocate 1 container on nm1.
+    while (true) {
+      AllocateResponse response =
+          am.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
+            new ArrayList<ContainerId>());
+      nm1.nodeHeartbeat(true);
+      containers.addAll(response.getAllocatedContainers());
+      expectedNMTokens.addAll(response.getNMTokens());
+      if (containers.size() == NUM_CONTAINERS) {
+        break;
+      }
+      Thread.sleep(200);
+      System.out.println("Waiting for container to be allocated.");
+    }
+    NodeId nodeId = expectedNMTokens.get(0).getNodeId();
+    // NMToken is sent for the allocated container.
+    Assert.assertEquals(nm1.getNodeId(), nodeId);
+  }
+
   @Test (timeout = 40000)
   public void testNMToken() throws Exception {
     MockRM rm = new MockRM();

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -133,6 +133,8 @@ public class TestRMAppAttemptTransitions {
   private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
   private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
       spy(new ClientToAMTokenSecretManagerInRM());
+  private NMTokenSecretManagerInRM nmTokenManager =
+      spy(new NMTokenSecretManagerInRM(conf));
   private boolean transferStateFromPreviousAttempt = false;
 
   private final class TestApplicationAttemptEventDispatcher implements
@@ -224,7 +226,7 @@ public class TestRMAppAttemptTransitions {
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
           null, amRMTokenManager,
           new RMContainerTokenSecretManager(conf),
-          new NMTokenSecretManagerInRM(conf),
+          nmTokenManager,
           clientToAMTokenManager,
           writer);
     
@@ -443,6 +445,8 @@ public class TestRMAppAttemptTransitions {
             any(
                 ApplicationAttemptId.class), any(List.class), any(List.class), 
                 any(List.class), any(List.class));
+    verify(nmTokenManager).clearNodeSetForAttempt(
+      applicationAttempt.getAppAttemptId());
   }
   
   /**