Browse Source

HDFS-5322. Merge change r1531440 from branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.2@1531442 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 years ago
parent
commit
49d53b1801

+ 26 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -1292,6 +1292,29 @@ public abstract class Server {
       }
     }
 
+    private Throwable getCauseForInvalidToken(IOException e) {
+      Throwable cause = e;
+      while (cause != null) {
+        if (cause instanceof RetriableException) {
+          return (RetriableException) cause;
+        } else if (cause instanceof StandbyException) {
+          return (StandbyException) cause;
+        } else if (cause instanceof InvalidToken) {
+          // FIXME: hadoop method signatures are restricting the SASL
+          // callbacks to only returning InvalidToken, but some services
+          // need to throw other exceptions (ex. NN + StandyException),
+          // so for now we'll tunnel the real exceptions via an
+          // InvalidToken's cause which normally is not set 
+          if (cause.getCause() != null) {
+            cause = cause.getCause();
+          }
+          return cause;
+        }
+        cause = cause.getCause();
+      }
+      return e;
+    }
+    
     private void saslProcess(RpcSaslProto saslMessage)
         throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
@@ -1304,29 +1327,11 @@ public abstract class Server {
         try {
           saslResponse = processSaslMessage(saslMessage);
         } catch (IOException e) {
-          IOException sendToClient = e;
-          Throwable cause = e;
-          while (cause != null) {
-            if (cause instanceof InvalidToken) {
-              // FIXME: hadoop method signatures are restricting the SASL
-              // callbacks to only returning InvalidToken, but some services
-              // need to throw other exceptions (ex. NN + StandyException),
-              // so for now we'll tunnel the real exceptions via an
-              // InvalidToken's cause which normally is not set 
-              if (cause.getCause() != null) {
-                cause = cause.getCause();
-              }
-              sendToClient = (IOException) cause;
-              break;
-            }
-            cause = cause.getCause();
-          }
           rpcMetrics.incrAuthenticationFailures();
-          String clientIP = this.toString();
           // attempting user could be null
-          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
-            " (" + e.getLocalizedMessage() + ")");
-          throw sendToClient;
+          AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
+              + attemptingUser + " (" + e.getLocalizedMessage() + ")");
+          throw (IOException) getCauseForInvalidToken(e);
         }
         
         if (saslServer != null && saslServer.isComplete()) {

+ 10 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java

@@ -45,11 +45,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * A utility class for dealing with SASL on RPC server
@@ -267,13 +269,15 @@ public class SaslRpcServer {
       this.connection = connection;
     }
 
-    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
-      return encodePassword(secretManager.retrievePassword(tokenid));
+    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken,
+        StandbyException, RetriableException, IOException {
+      return encodePassword(secretManager.retriableRetrievePassword(tokenid));
     }
 
     @Override
     public void handle(Callback[] callbacks) throws InvalidToken,
-        UnsupportedCallbackException {
+        UnsupportedCallbackException, StandbyException, RetriableException,
+        IOException {
       NameCallback nc = null;
       PasswordCallback pc = null;
       AuthorizeCallback ac = null;
@@ -292,7 +296,8 @@ public class SaslRpcServer {
         }
       }
       if (pc != null) {
-        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
+        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
+            secretManager);
         char[] password = getPassword(tokenIdentifier);
         UserGroupInformation user = null;
         user = tokenIdentifier.getUser(); // may throw exception

+ 24 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java

@@ -29,6 +29,7 @@ import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 
 
@@ -66,7 +67,29 @@ public abstract class SecretManager<T extends TokenIdentifier> {
    * @return the password to use
    * @throws InvalidToken the token was invalid
    */
-  public abstract byte[] retrievePassword(T identifier) throws InvalidToken;
+  public abstract byte[] retrievePassword(T identifier)
+      throws InvalidToken;
+  
+  /**
+   * The same functionality with {@link #retrievePassword}, except that this 
+   * method can throw a {@link RetriableException} or a {@link StandbyException}
+   * to indicate that client can retry/failover the same operation because of 
+   * temporary issue on the server side.
+   * 
+   * @param identifier the identifier to validate
+   * @return the password to use
+   * @throws InvalidToken the token was invalid
+   * @throws StandbyException the server is in standby state, the client can
+   *         try other servers
+   * @throws RetriableException the token was invalid, and the server thinks 
+   *         this may be a temporary issue and suggests the client to retry
+   * @throws IOException to allow future exceptions to be added without breaking
+   *         compatibility        
+   */
+  public byte[] retriableRetrievePassword(T identifier)
+      throws InvalidToken, StandbyException, RetriableException, IOException {
+    return retrievePassword(identifier);
+  }
   
   /**
    * Create an empty token identifier.

+ 16 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -289,20 +289,30 @@ extends AbstractDelegationTokenIdentifier>
         + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
     return password;
   }
-
-  @Override
-  public synchronized byte[] retrievePassword(TokenIdent identifier)
+  
+  /**
+   * Find the DelegationTokenInformation for the given token id, and verify that
+   * if the token is expired. Note that this method should be called with 
+   * acquiring the secret manager's monitor.
+   */
+  protected DelegationTokenInformation checkToken(TokenIdent identifier)
       throws InvalidToken {
+    assert Thread.holdsLock(this);
     DelegationTokenInformation info = currentTokens.get(identifier);
     if (info == null) {
       throw new InvalidToken("token (" + identifier.toString()
           + ") can't be found in cache");
     }
-    long now = Time.now();
-    if (info.getRenewDate() < now) {
+    if (info.getRenewDate() < Time.now()) {
       throw new InvalidToken("token (" + identifier.toString() + ") is expired");
     }
-    return info.getPassword();
+    return info;
+  }
+  
+  @Override
+  public synchronized byte[] retrievePassword(TokenIdent identifier)
+      throws InvalidToken {
+    return checkToken(identifier).getPassword();
   }
 
   protected String getTrackingIdIfEnabled(TokenIdent ident) {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -33,6 +33,9 @@ Release 2.2.1 - UNRELEASED
     HDFS-5335. Hive query failed with possible race in dfs output stream.
     (Haohui Mai via suresh)
 
+    HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA 
+    clusters. (jing9)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -115,6 +116,24 @@ public class DelegationTokenSecretManager
     return super.retrievePassword(identifier);
   }
   
+  @Override
+  public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier)
+      throws InvalidToken, StandbyException, RetriableException, IOException {
+    namesystem.checkOperation(OperationCategory.READ);
+    try {
+      return super.retrievePassword(identifier);
+    } catch (InvalidToken it) {
+      if (namesystem.inTransitionToActive()) {
+        // if the namesystem is currently in the middle of transition to 
+        // active state, let client retry since the corresponding editlog may 
+        // have not been applied yet
+        throw new RetriableException(it);
+      } else {
+        throw it;
+      }
+    }
+  }
+  
   /**
    * Returns expiry time of a token given its identifier.
    * 

+ 31 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -439,6 +439,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private HAContext haContext;
 
   private final boolean haEnabled;
+  
+  /**
+   * Whether the namenode is in the middle of starting the active service
+   */
+  private volatile boolean startingActiveService = false;
     
   private INodeId inodeId;
   
@@ -884,6 +889,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws IOException
    */
   void startActiveServices() throws IOException {
+    startingActiveService = true;
     LOG.info("Starting services required for active state");
     writeLock();
     try {
@@ -938,8 +944,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       nnrmthread.start();
     } finally {
       writeUnlock();
+      startingActiveService = false;
     }
   }
+  
+  /**
+   * @return Whether the namenode is transitioning to active state and is in the
+   *         middle of the {@link #startActiveServices()}
+   */
+  public boolean inTransitionToActive() {
+    return haEnabled && haContext != null
+        && haContext.getState().getServiceState() == HAServiceState.ACTIVE
+        && startingActiveService;
+  }
 
   private boolean shouldUseDelegationTokens() {
     return UserGroupInformation.isSecurityEnabled() ||
@@ -6285,11 +6302,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Verifies that the given identifier and password are valid and match.
    * @param identifier Token identifier.
    * @param password Password in the token.
-   * @throws InvalidToken
    */
   public synchronized void verifyToken(DelegationTokenIdentifier identifier,
-      byte[] password) throws InvalidToken {
-    getDelegationTokenSecretManager().verifyToken(identifier, password);
+      byte[] password) throws InvalidToken, RetriableException {
+    try {
+      getDelegationTokenSecretManager().verifyToken(identifier, password);
+    } catch (InvalidToken it) {
+      if (inTransitionToActive()) {
+        throw new RetriableException(it);
+      }
+      throw it;
+    }
   }
   
   @Override
@@ -6306,6 +6329,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return editLogTailer;
   }
   
+  @VisibleForTesting
+  public void setEditLogTailerForTests(EditLogTailer tailer) {
+    this.editLogTailer = tailer;
+  }
+  
   @VisibleForTesting
   void setFsLockForTests(ReentrantReadWriteLock lock) {
     this.fsLock = lock;

+ 104 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -47,19 +48,22 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.SecurityUtilTestHelper;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import com.google.common.base.Joiner;
 
@@ -78,8 +82,12 @@ public class TestDelegationTokensWithHA {
   private static DelegationTokenSecretManager dtSecretManager;
   private static DistributedFileSystem dfs;
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
+  private volatile boolean catchup = false;
+  
+  @Before
+  public void setupCluster() throws Exception {
+    SecurityUtilTestHelper.setTokenServiceUseIp(true);
+    
     conf.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
@@ -101,18 +109,12 @@ public class TestDelegationTokensWithHA {
         nn0.getNamesystem());
   }
 
-  @AfterClass
-  public static void shutdownCluster() throws IOException {
+  @After
+  public void shutdownCluster() throws IOException {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
-
-
-  @Before
-  public void prepTest() {
-    SecurityUtilTestHelper.setTokenServiceUseIp(true);
-  }
   
   @Test
   public void testDelegationTokenDFSApi() throws Exception {
@@ -155,6 +157,96 @@ public class TestDelegationTokensWithHA {
     doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
   }
   
+  private class EditLogTailerForTest extends EditLogTailer {
+    public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) {
+      super(namesystem, conf);
+    }
+    
+    public void catchupDuringFailover() throws IOException {
+      synchronized (TestDelegationTokensWithHA.this) {
+        while (!catchup) {
+          try {
+            LOG.info("The editlog tailer is waiting to catchup...");
+            TestDelegationTokensWithHA.this.wait();
+          } catch (InterruptedException e) {}
+        }
+      }
+      super.catchupDuringFailover();
+    }
+  }
+  
+  /**
+   * Test if correct exception (StandbyException or RetriableException) can be
+   * thrown during the NN failover. 
+   */
+  @Test
+  public void testDelegationTokenDuringNNFailover() throws Exception {
+    EditLogTailer editLogTailer = nn1.getNamesystem().getEditLogTailer();
+    // stop the editLogTailer of nn1
+    editLogTailer.stop();
+    Configuration conf = (Configuration) Whitebox.getInternalState(
+        editLogTailer, "conf");
+    nn1.getNamesystem().setEditLogTailerForTests(
+        new EditLogTailerForTest(nn1.getNamesystem(), conf));
+    
+    // create token
+    final Token<DelegationTokenIdentifier> token =
+        getDelegationToken(fs, "JobTracker");
+    DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
+    byte[] tokenId = token.getIdentifier();
+    identifier.readFields(new DataInputStream(
+             new ByteArrayInputStream(tokenId)));
+
+    // Ensure that it's present in the nn0 secret manager and can
+    // be renewed directly from there.
+    LOG.info("A valid token should have non-null password, " +
+        "and should be renewed successfully");
+    assertTrue(null != dtSecretManager.retrievePassword(identifier));
+    dtSecretManager.renewToken(token, "JobTracker");
+    
+    // transition nn0 to standby
+    cluster.transitionToStandby(0);
+    
+    try {
+      cluster.getNameNodeRpc(0).renewDelegationToken(token);
+      fail("StandbyException is expected since nn0 is in standby state");
+    } catch (StandbyException e) {
+      GenericTestUtils.assertExceptionContains(
+          HAServiceState.STANDBY.toString(), e);
+    }
+    
+    new Thread() {
+      @Override
+      public void run() {
+        try {
+          cluster.transitionToActive(1);
+        } catch (Exception e) {
+          LOG.error("Transition nn1 to active failed", e);
+        }    
+      }
+    }.start();
+    
+    Thread.sleep(1000);
+    try {
+      nn1.getNamesystem().verifyToken(token.decodeIdentifier(),
+          token.getPassword());
+      fail("RetriableException/StandbyException is expected since nn1 is in transition");
+    } catch (IOException e) {
+      assertTrue(e instanceof StandbyException
+          || e instanceof RetriableException);
+      LOG.info("Got expected exception", e);
+    }
+    
+    catchup = true;
+    synchronized (this) {
+      this.notifyAll();
+    }
+    
+    Configuration clientConf = dfs.getConf();
+    doRenewOrCancel(token, clientConf, TokenTestAction.RENEW);
+    doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL);
+  }
+  
   @SuppressWarnings("deprecation")
   @Test
   public void testDelegationTokenWithDoAs() throws Exception {