Forráskód Böngészése

YARN-643. Fixed ResourceManager to remove all tokens consistently on app finish. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1515256 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1515257 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 éve
szülő
commit
040e1cd283

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -59,6 +59,9 @@ Release 2.1.1-beta - UNRELEASED
     forceKillApplication on non-running and finished applications. (Xuan Gong
     via vinodkv)
 
+    YARN-643. Fixed ResourceManager to remove all tokens consistently on app
+    finish. (Xuan Gong via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 15 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -761,6 +761,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
               rejectedEvent.getApplicationAttemptId().getApplicationId(),
               message)
           );
+
+      appAttempt.removeTokens(appAttempt);
     }
   }
 
@@ -847,7 +849,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
       ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
 
       // Tell the AMS. Unregister from the ApplicationMasterService
@@ -894,9 +895,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
         finalAttemptState));
 
-      // Remove the AppAttempt from the AMRMTokenSecretManager
-      appAttempt.rmContext.getAMRMTokenSecretManager()
-        .applicationMasterFinished(appAttemptId);
+      appAttempt.removeTokens(appAttempt);
     }
   }
 
@@ -1015,7 +1014,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
           " exitCode: " + status.getExitStatus() +
           " due to: " +  status.getDiagnostics() + "." +
           "Failing this attempt.");
-
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
@@ -1042,12 +1040,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       appAttempt.rmContext.getAMFinishingMonitor().unregister(
           appAttempt.getAppAttemptId());
 
-      // Unregister from the ClientToAMTokenSecretManager
-      if (UserGroupInformation.isSecurityEnabled()) {
-        appAttempt.rmContext.getClientToAMTokenSecretManager()
-          .unRegisterApplication(appAttempt.getAppAttemptId());
-      }
-
       if(!appAttempt.submissionContext.getUnmanagedAM()) {
         // Tell the launcher to cleanup.
         appAttempt.eventHandler.handle(new AMLauncherEvent(
@@ -1116,10 +1108,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
 
-      // Remove the AppAttempt from the AMRMTokenSecretManager
-      appAttempt.rmContext.getAMRMTokenSecretManager()
-        .applicationMasterFinished(appAttemptId);
-
       appAttempt.progress = 1.0f;
 
       RMAppAttemptUnregistrationEvent unregisterEvent
@@ -1267,4 +1255,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
               + " MasterContainer: " + masterContainer);
     store.storeApplicationAttempt(this);
   }
+
+  private void removeTokens(RMAppAttemptImpl appAttempt) {
+    // Unregister from the ClientToAMTokenSecretManager
+    if (UserGroupInformation.isSecurityEnabled()) {
+      appAttempt.rmContext.getClientToAMTokenSecretManager()
+        .unRegisterApplication(appAttempt.getAppAttemptId());
+    }
+
+    // Remove the AppAttempt from the AMRMTokenSecretManager
+    appAttempt.rmContext.getAMRMTokenSecretManager()
+      .applicationMasterFinished(appAttempt.getAppAttemptId());
+  }
 }

+ 31 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -102,6 +104,11 @@ public class TestRMAppAttemptTransitions {
   
   private RMApp application;
   private RMAppAttempt applicationAttempt;
+
+  private Configuration conf = new Configuration();
+  private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
+  private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
+      spy(new ClientToAMTokenSecretManagerInRM());
   
   private final class TestApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
@@ -163,14 +170,13 @@ public class TestRMAppAttemptTransitions {
         mock(ContainerAllocationExpirer.class);
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
-    Configuration conf = new Configuration();
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
-          null, new AMRMTokenSecretManager(conf),
+          null, amRMTokenManager,
           new RMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInRM(conf),
-          new ClientToAMTokenSecretManagerInRM());
+          clientToAMTokenManager);
     
     RMStateStore store = mock(RMStateStore.class);
     ((RMContextImpl) rmContext).setStateStore(store);
@@ -261,7 +267,11 @@ public class TestRMAppAttemptTransitions {
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
-    
+    if (UserGroupInformation.isSecurityEnabled()) {
+      verify(clientToAMTokenManager).registerApplication(
+          applicationAttempt.getAppAttemptId());
+    }
+    assertNotNull(applicationAttempt.getAMRMToken());
     // Check events
     verify(masterService).
         registerAppAttempt(applicationAttempt.getAppAttemptId());
@@ -288,6 +298,7 @@ public class TestRMAppAttemptTransitions {
     // this works for unmanaged and managed AM's because this is actually doing
     // verify(application).handle(anyObject());
     verify(application).handle(any(RMAppRejectedEvent.class));
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   /**
@@ -303,6 +314,7 @@ public class TestRMAppAttemptTransitions {
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   /**
@@ -377,6 +389,8 @@ public class TestRMAppAttemptTransitions {
     
     // Check events
     verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
+
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   /**
@@ -422,6 +436,7 @@ public class TestRMAppAttemptTransitions {
         applicationAttempt.getTrackingUrl());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 0);
   }
 
   /**
@@ -442,6 +457,7 @@ public class TestRMAppAttemptTransitions {
         .getJustFinishedContainers().size());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   
@@ -592,6 +608,7 @@ public class TestRMAppAttemptTransitions {
             applicationAttempt.getAppAttemptId(), 
             RMAppAttemptEventType.KILL));
     testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   } 
   
   @Test
@@ -666,6 +683,7 @@ public class TestRMAppAttemptTransitions {
       applicationAttempt.getAppAttemptId(), cs));
     assertEquals(RMAppAttemptState.FAILED,
       applicationAttempt.getAppAttemptState());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
   
   @Test
@@ -709,6 +727,7 @@ public class TestRMAppAttemptTransitions {
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test(timeout=10000)
@@ -725,6 +744,7 @@ public class TestRMAppAttemptTransitions {
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test(timeout=20000)
@@ -742,6 +762,7 @@ public class TestRMAppAttemptTransitions {
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
   }
 
   @Test 
@@ -848,4 +869,10 @@ public class TestRMAppAttemptTransitions {
         diagnostics, 0);
   }
   
+  private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
+    verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      verify(clientToAMTokenManager, times(count)).unRegisterApplication(appAttemptId);
+    }
+  }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

@@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -46,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -80,6 +84,7 @@ public class TestAMRMTokens {
    * 
    * @throws Exception
    */
+  @SuppressWarnings("unchecked")
   @Test
   public void testTokenExpiry() throws Exception {
 
@@ -134,6 +139,20 @@ public class TestAMRMTokens {
       finishAMRequest.setTrackingUrl("url");
       rmClient.finishApplicationMaster(finishAMRequest);
 
+      // Send RMAppAttemptEventType.CONTAINER_FINISHED to transit RMAppAttempt
+      // from Finishing state to Finished State. Both AMRMToken and
+      // ClientToAMToken will be removed.
+      ContainerStatus containerStatus =
+          BuilderUtils.newContainerStatus(attempt.getMasterContainer().getId(),
+              ContainerState.COMPLETE,
+              "AM Container Finished", 0);
+      rm.getRMContext()
+          .getDispatcher()
+          .getEventHandler()
+          .handle(
+              new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
+                  containerStatus));
+
       // Now simulate trying to allocate. RPC call itself should throw auth
       // exception.
       rpc.stopProxy(rmClient, conf); // To avoid using cached client