Ver Fonte

HDFS-2579. Starting delegation token manager during safemode fails. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1242225 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon há 13 anos atrás
pai
commit
e918b91e23
16 ficheiros alterados com 295 adições e 39 exclusões
  1. 21 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
  3. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  4. 13 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
  5. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
  6. 48 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  7. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
  8. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
  9. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java
  10. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  11. 54 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java
  12. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
  13. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
  14. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
  15. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
  16. 123 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java

+ 21 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -40,6 +40,8 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.base.Preconditions;
+
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Evolving
 public abstract 
@@ -84,6 +86,12 @@ extends AbstractDelegationTokenIdentifier>
   private Thread tokenRemoverThread;
   protected volatile boolean running;
 
+  /**
+   * If the delegation token update thread holds this lock, it will
+   * not get interrupted.
+   */
+  protected Object noInterruptsLock = new Object();
+
   public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
       long delegationTokenRemoverScanInterval) {
@@ -95,6 +103,7 @@ extends AbstractDelegationTokenIdentifier>
 
   /** should be called before this object is used */
   public void startThreads() throws IOException {
+    Preconditions.checkState(!running);
     updateCurrentKey();
     synchronized (this) {
       running = true;
@@ -354,12 +363,21 @@ extends AbstractDelegationTokenIdentifier>
     }
   }
 
-  public synchronized void stopThreads() {
+  public void stopThreads() {
     if (LOG.isDebugEnabled())
       LOG.debug("Stopping expired delegation token remover thread");
     running = false;
+    
     if (tokenRemoverThread != null) {
-      tokenRemoverThread.interrupt();
+      synchronized (noInterruptsLock) {
+        tokenRemoverThread.interrupt();
+      }
+      try {
+        tokenRemoverThread.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            "Unable to join on token removal thread", e);
+      }
     }
   }
   
@@ -395,7 +413,7 @@ extends AbstractDelegationTokenIdentifier>
             lastTokenCacheCleanup = now;
           }
           try {
-            Thread.sleep(5000); // 5 seconds
+            Thread.sleep(Math.min(5000, keyUpdateInterval)); // 5 seconds
           } catch (InterruptedException ie) {
             LOG
             .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt

@@ -182,3 +182,5 @@ HDFS-2794. Active NN may purge edit log files before standby NN has a chance to
 HDFS-2901. Improvements for SBN web UI - not show under-replicated/missing blocks. (Brandon Li via jitendra)
 
 HDFS-2905. HA: Standby NN NPE when shared edits dir is deleted. (Bikas Saha via jitendra)
+
+HDFS-2579. Starting delegation token manager during safemode fails. (todd)

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -166,6 +166,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24*60*60*1000;  // 1 day
   public static final String  DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY = "dfs.namenode.delegation.token.max-lifetime";
   public static final long    DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days
+  public static final String  DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY = "dfs.namenode.delegation.token.always-use"; // for tests
+  public static final boolean DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT = false;
 
   //Filesystem limit keys
   public static final String  DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY = "dfs.namenode.fs-limits.max-component-length";

+ 13 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.security.token.delegation;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 
@@ -282,7 +283,18 @@ public class DelegationTokenSecretManager
   @Override //AbstractDelegationTokenManager
   protected void logUpdateMasterKey(DelegationKey key)
       throws IOException {
-    namesystem.logUpdateMasterKey(key);
+    synchronized (noInterruptsLock) {
+      // The edit logging code will fail catastrophically if it
+      // is interrupted during a logSync, since the interrupt
+      // closes the edit log files. Doing this inside the
+      // above lock and then checking interruption status
+      // prevents this bug.
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException(
+            "Interrupted before updating master key");
+      }
+      namesystem.logUpdateMasterKey(key);
+    }
   }
 
   /** A utility method for creating credentials. */

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java

@@ -219,6 +219,11 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
   File getFile() {
     return file;
   }
+  
+  @Override
+  public String toString() {
+    return "EditLogFileOutputStream(" + file + ")";
+  }
 
   /**
    * @return true if this stream is currently open.

+ 48 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
@@ -269,6 +271,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
     TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
   private DelegationTokenSecretManager dtSecretManager;
+  private boolean alwaysUseDelegationTokensForTests;
+  
 
   //
   // Stores the correct file name hierarchy
@@ -447,13 +451,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     dir.imageLoadComplete();
   }
 
-  void startSecretManager() throws IOException {
+  private void startSecretManager() {
     if (dtSecretManager != null) {
-      dtSecretManager.startThreads();
+      try {
+        dtSecretManager.startThreads();
+      } catch (IOException e) {
+        // Inability to start secret manager
+        // can't be recovered from.
+        throw new RuntimeException(e);
+      }
     }
   }
   
-  void stopSecretManager() {
+  private void startSecretManagerIfNecessary() {
+    boolean shouldRun = shouldUseDelegationTokens() &&
+      !isInSafeMode() && getEditLog().isOpenForWrite();
+    boolean running = dtSecretManager.isRunning();
+    if (shouldRun && !running) {
+      startSecretManager();
+    }
+  }
+
+  private void stopSecretManager() {
     if (dtSecretManager != null) {
       dtSecretManager.stopThreads();
     }
@@ -539,9 +558,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
         dir.fsImage.editLog.openForWrite();
       }
-      if (UserGroupInformation.isSecurityEnabled()) {
-        startSecretManager();
-      }
       if (haEnabled) {
         // Renew all of the leases before becoming active.
         // This is because, while we were in standby mode,
@@ -550,11 +566,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         leaseManager.renewAllLeases();
       }
       leaseManager.startMonitor();
+      startSecretManagerIfNecessary();
     } finally {
       writeUnlock();
     }
   }
 
+  private boolean shouldUseDelegationTokens() {
+    return UserGroupInformation.isSecurityEnabled() ||
+      alwaysUseDelegationTokensForTests;
+  }
+
   /** 
    * Stop services required in active state
    * @throws InterruptedException
@@ -839,6 +861,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     this.standbyShouldCheckpoint = conf.getBoolean(
         DFS_HA_STANDBY_CHECKPOINTS_KEY,
         DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
+    
+    // For testing purposes, allow the DT secret manager to be started regardless
+    // of whether security is enabled.
+    alwaysUseDelegationTokensForTests = 
+      conf.getBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
   }
 
   /**
@@ -3479,6 +3507,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           + nt.getNumOfLeaves() + " datanodes");
       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
           + blockManager.numOfUnderReplicatedBlocks() + " blocks");
+
+      startSecretManagerIfNecessary();
     }
 
     /**
@@ -3956,6 +3986,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void enterSafeMode(boolean resourcesLow) throws IOException {
     writeLock();
     try {
+      // Stop the secret manager, since rolling the master key would
+      // try to write to the edit log
+      stopSecretManager();
+
       // Ensure that any concurrent operations have been fully synced
       // before entering safe mode. This ensures that the FSImage
       // is entirely stable on disk as soon as we're in safe mode.
@@ -4805,16 +4839,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @param key new delegation key.
    */
   public void logUpdateMasterKey(DelegationKey key) throws IOException {
-    writeLock();
-    try {
-      if (isInSafeMode()) {
-        throw new SafeModeException(
-          "Cannot log master key update in safe mode", safeMode);
-      }
-      getEditLog().logUpdateMasterKey(key);
-    } finally {
-      writeUnlock();
-    }
+    
+    assert !isInSafeMode() :
+      "this should never be called while in safemode, since we stop " +
+      "the DT manager before entering safemode!";
+    // No need to hold FSN lock since we don't access any internal
+    // structures, and this is stopped before the FSN shuts itself
+    // down, etc.
+    getEditLog().logUpdateMasterKey(key);
     getEditLog().logSync();
   }
   

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java

@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -47,9 +48,11 @@ public class TestResolveHdfsSymlink {
   @BeforeClass
   public static void setUp() throws IOException {
     Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     cluster = new MiniDFSCluster.Builder(conf).build();
     cluster.waitActive();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
+
   }
 
   @AfterClass

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java

@@ -27,9 +27,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -52,14 +52,15 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
     SupportsBlocks = true;
+    CONF.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    
     cluster =
         new MiniDFSCluster.Builder(CONF).nnTopology(
                 MiniDFSNNTopology.simpleFederatedTopology(2))
             .numDataNodes(2)
             .build();
     cluster.waitClusterUp();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(0)).startThreads();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(1)).startThreads();
     
     fHdfs = cluster.getFileSystem(0);
     fHdfs2 = cluster.getFileSystem(1);

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsHdfs.java

@@ -26,9 +26,9 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -51,9 +51,11 @@ public class TestViewFsHdfs extends ViewFsBaseTest {
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
     SupportsBlocks = true;
+    CONF.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
     cluster.waitClusterUp();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
     fc = FileContext.getFileContext(cluster.getURI(0), CONF);
     defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1827,6 +1827,10 @@ public class MiniDFSCluster {
   public void setLeasePeriod(long soft, long hard) {
     NameNodeAdapter.setLeasePeriod(getNamesystem(), soft, hard);
   }
+  
+  public void setWaitSafeMode(boolean wait) {
+    this.waitSafeMode = wait;
+  }
 
   /**
    * Returns the current set of datanodes

+ 54 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.security;
 
 
 
+import static org.junit.Assert.*;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -32,12 +34,16 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
@@ -64,6 +70,7 @@ public class TestDelegationToken {
     config.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
     config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
     config.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
+    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     config.set("hadoop.security.auth_to_local",
         "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
     FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
@@ -71,7 +78,6 @@ public class TestDelegationToken {
     cluster.waitActive();
     dtSecretManager = NameNodeAdapter.getDtSecretManager(
         cluster.getNamesystem());
-    dtSecretManager.startThreads();
   }
 
   @After
@@ -269,5 +275,51 @@ public class TestDelegationToken {
       }
     });
   }
- 
+  
+  /**
+   * Test that the delegation token secret manager only runs when the
+   * NN is out of safe mode. This is because the secret manager
+   * has to log to the edit log, which should not be written in
+   * safe mode. Regression test for HDFS-2579.
+   */
+  @Test
+  public void testDTManagerInSafeMode() throws Exception {
+    cluster.startDataNodes(config, 1, true, StartupOption.REGULAR, null);
+    FileSystem fs = cluster.getFileSystem();
+    for (int i = 0; i < 5; i++) {
+      DFSTestUtil.createFile(fs, new Path("/test-" + i), 100, (short)1, 1L);
+    }
+    cluster.getConfiguration(0).setInt(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY, 500); 
+    cluster.getConfiguration(0).setInt(
+        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 30000);
+    cluster.setWaitSafeMode(false);
+    cluster.restartNameNode();
+    NameNode nn = cluster.getNameNode();
+    assertTrue(nn.isInSafeMode());
+    DelegationTokenSecretManager sm =
+      NameNodeAdapter.getDtSecretManager(nn.getNamesystem());
+    assertFalse("Secret manager should not run in safe mode", sm.isRunning());
+    
+    NameNodeAdapter.leaveSafeMode(nn, false);
+    assertTrue("Secret manager should start when safe mode is exited",
+        sm.isRunning());
+    
+    LOG.info("========= entering safemode again");
+    
+    NameNodeAdapter.enterSafeMode(nn, false);
+    assertFalse("Secret manager should stop again when safe mode " +
+        "is manually entered", sm.isRunning());
+    
+    // Set the cluster to leave safemode quickly on its own.
+    cluster.getConfiguration(0).setInt(
+        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+    cluster.setWaitSafeMode(true);
+    cluster.restartNameNode();
+    nn = cluster.getNameNode();
+    sm = NameNodeAdapter.getDtSecretManager(nn.getNamesystem());
+
+    assertFalse(nn.isInSafeMode());
+    assertTrue(sm.isRunning());
+  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
@@ -114,11 +113,12 @@ public class TestDelegationTokenForProxyUser {
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
     config.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER),
         "group1");
+    config.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     configureSuperUserIPAddresses(config, REAL_USER);
     FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
     cluster = new MiniDFSCluster.Builder(config).build();
     cluster.waitActive();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
     ProxyUsers.refreshSuperUserGroupsConfiguration(config);
   }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java

@@ -108,10 +108,11 @@ public class OfflineEditsViewerHelper {
     // for security to work (fake JobTracker user)
     config.set("hadoop.security.auth_to_local",
       "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
+    config.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     cluster =
       new MiniDFSCluster.Builder(config).manageNameDfsDirs(false).build();
     cluster.waitClusterUp();
-    cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
   }
 
   /**

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java

@@ -22,6 +22,7 @@ import junit.framework.Assert;
 import java.io.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -64,11 +65,12 @@ public class TestCheckPointForSecurityTokens {
     DistributedFileSystem fs = null;
     try {
       Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
       FSNamesystem namesystem = cluster.getNamesystem();
-      namesystem.getDelegationTokenSecretManager().startThreads();
       String renewer = UserGroupInformation.getLoginUser().getUserName();
       Token<DelegationTokenIdentifier> token1 = namesystem
           .getDelegationToken(new Text(renewer)); 
@@ -122,7 +124,6 @@ public class TestCheckPointForSecurityTokens {
       }
 
       namesystem = cluster.getNamesystem();
-      namesystem.getDelegationTokenSecretManager().startThreads();
       Token<DelegationTokenIdentifier> token3 = namesystem
           .getDelegationToken(new Text(renewer));
       Token<DelegationTokenIdentifier> token4 = namesystem
@@ -136,7 +137,6 @@ public class TestCheckPointForSecurityTokens {
       cluster.waitActive();
 
       namesystem = cluster.getNamesystem();
-      namesystem.getDelegationTokenSecretManager().startThreads();
       Token<DelegationTokenIdentifier> token5 = namesystem
           .getDelegationToken(new Text(renewer));
 
@@ -159,7 +159,6 @@ public class TestCheckPointForSecurityTokens {
       cluster.waitActive();
 
       namesystem = cluster.getNamesystem();
-      namesystem.getDelegationTokenSecretManager().startThreads();
       try {
         renewToken(token1);
         cancelToken(token1);

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java

@@ -24,6 +24,7 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -91,6 +92,9 @@ public class TestSecurityTokenEditLog extends TestCase {
     FileSystem fileSys = null;
 
     try {
+      conf.setBoolean(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
       cluster.waitActive();
       fileSys = cluster.getFileSystem();
@@ -106,7 +110,6 @@ public class TestSecurityTokenEditLog extends TestCase {
   
       // set small size of flush buffer
       editLog.setOutputBufferCapacity(2048);
-      namesystem.getDelegationTokenSecretManager().startThreads();
     
       // Create threads and make them run transactions concurrently.
       Thread threadId[] = new Thread[NUM_THREADS];

+ 123 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java

@@ -317,6 +317,9 @@ public class TestHAStateTransitions {
   public void testDelegationTokensAfterFailover() throws IOException,
       URISyntaxException {
     Configuration conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
         .numDataNodes(0)
@@ -326,7 +329,6 @@ public class TestHAStateTransitions {
       cluster.transitionToActive(0);
       NameNode nn1 = cluster.getNameNode(0);
       NameNode nn2 = cluster.getNameNode(1);
-      NameNodeAdapter.getDtSecretManager(nn1.getNamesystem()).startThreads();
 
       String renewer = UserGroupInformation.getLoginUser().getUserName();
       Token<DelegationTokenIdentifier> token = nn1.getRpcServer()
@@ -335,8 +337,6 @@ public class TestHAStateTransitions {
       LOG.info("Failing over to NN 1");
       cluster.transitionToStandby(0);
       cluster.transitionToActive(1);
-      // Need to explicitly start threads because security is not enabled.
-      NameNodeAdapter.getDtSecretManager(nn2.getNamesystem()).startThreads();
 
       nn2.getRpcServer().renewDelegationToken(token);
       nn2.getRpcServer().cancelDelegationToken(token);
@@ -421,4 +421,124 @@ public class TestHAStateTransitions {
       EditLogFileOutputStream.writeHeader(out);
     }
   }
+  
+
+  /**
+   * The secret manager needs to start/stop - the invariant should be that
+   * the secret manager runs if and only if the NN is active and not in
+   * safe mode. As a state diagram, we need to test all of the following
+   * transitions to make sure the secret manager is started when we transition
+   * into state 4, but none of the others.
+   * <pre>
+   *         SafeMode     Not SafeMode 
+   * Standby   1 <------> 2
+   *           ^          ^
+   *           |          |
+   *           v          v
+   * Active    3 <------> 4
+   * </pre>
+   */
+  @Test(timeout=60000)
+  public void testSecretManagerState() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY, 50);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(1)
+         .waitSafeMode(false)
+        .build();
+    try {
+      cluster.transitionToActive(0);
+      DFSTestUtil.createFile(cluster.getFileSystem(0),
+          TEST_FILE_PATH, 6000, (short)1, 1L);
+      
+      cluster.getConfiguration(0).setInt(
+          DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 60000);
+
+      cluster.restartNameNode(0);
+      NameNode nn = cluster.getNameNode(0);
+      
+      banner("Started in state 1.");
+      assertTrue(nn.isStandbyState());
+      assertTrue(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+      
+      banner("Transition 1->2. Should not start secret manager");
+      NameNodeAdapter.leaveSafeMode(nn, false);
+      assertTrue(nn.isStandbyState());
+      assertFalse(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+  
+      banner("Transition 2->1. Should not start secret manager.");
+      NameNodeAdapter.enterSafeMode(nn, false);
+      assertTrue(nn.isStandbyState());
+      assertTrue(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+  
+      banner("Transition 1->3. Should not start secret manager.");
+      nn.getRpcServer().transitionToActive();
+      assertFalse(nn.isStandbyState());
+      assertTrue(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+  
+      banner("Transition 3->1. Should not start secret manager.");
+      nn.getRpcServer().transitionToStandby();
+      assertTrue(nn.isStandbyState());
+      assertTrue(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+  
+      banner("Transition 1->3->4. Should start secret manager.");
+      nn.getRpcServer().transitionToActive();
+      NameNodeAdapter.leaveSafeMode(nn, false);
+      assertFalse(nn.isStandbyState());
+      assertFalse(nn.isInSafeMode());
+      assertTrue(isDTRunning(nn));
+      
+      banner("Transition 4->3. Should stop secret manager");
+      NameNodeAdapter.enterSafeMode(nn, false);
+      assertFalse(nn.isStandbyState());
+      assertTrue(nn.isInSafeMode());
+      assertFalse(isDTRunning(nn));
+  
+      banner("Transition 3->4. Should start secret manager");
+      NameNodeAdapter.leaveSafeMode(nn, false);
+      assertFalse(nn.isStandbyState());
+      assertFalse(nn.isInSafeMode());
+      assertTrue(isDTRunning(nn));
+      
+      for (int i = 0; i < 20; i++) {
+        // Loop the last check to suss out races.
+        banner("Transition 4->2. Should stop secret manager.");
+        nn.getRpcServer().transitionToStandby();
+        assertTrue(nn.isStandbyState());
+        assertFalse(nn.isInSafeMode());
+        assertFalse(isDTRunning(nn));
+    
+        banner("Transition 2->4. Should start secret manager");
+        nn.getRpcServer().transitionToActive();
+        assertFalse(nn.isStandbyState());
+        assertFalse(nn.isInSafeMode());
+        assertTrue(isDTRunning(nn));
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  private boolean isDTRunning(NameNode nn) {
+    return NameNodeAdapter.getDtSecretManager(nn.getNamesystem()).isRunning();
+  }
+
+  /**
+   * Print a big banner in the test log to make debug easier.
+   */
+  static void banner(String string) {
+    LOG.info("\n\n\n\n================================================\n" +
+        string + "\n" +
+        "==================================================\n\n");
+  }
 }