瀏覽代碼

HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by a peer. (Arun Suresh and Gregory Chanan via kasha)

(cherry picked from commit db45f047ab6b19d8a3e7752bb2cde10827cd8dad)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt
Karthik Kambatla 10 年之前
父節點
當前提交
7bb5071a9e

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -636,6 +636,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-11228. Winutils task: unsecure path should not call
     AddNodeManagerAndUserACEsToObject. (Remus Rusanu via jianhe)
 
+    HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by 
+    a peer. (Arun Suresh and Gregory Chanan via kasha)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 35 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -20,10 +20,13 @@ package org.apache.hadoop.security.token.delegation;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -125,7 +128,7 @@ extends AbstractDelegationTokenIdentifier>
    * Reset all data structures and mutable state.
    */
   public synchronized void reset() {
-    currentId = 0;
+    setCurrentKeyId(0);
     allKeys.clear();
     setDelegationTokenSeqNum(0);
     currentTokens.clear();
@@ -138,8 +141,8 @@ extends AbstractDelegationTokenIdentifier>
   public synchronized void addKey(DelegationKey key) throws IOException {
     if (running) // a safety check
       throw new IOException("Can't add delegation key to a running SecretManager.");
-    if (key.getKeyId() > currentId) {
-      currentId = key.getKeyId();
+    if (key.getKeyId() > getCurrentKeyId()) {
+      setCurrentKeyId(key.getKeyId());
     }
     allKeys.put(key.getKeyId(), key);
   }
@@ -182,6 +185,30 @@ extends AbstractDelegationTokenIdentifier>
     return;
   }
 
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected synchronized int getCurrentKeyId() {
+    return currentId;
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected synchronized int incrementCurrentKeyId() {
+    return ++currentId;
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected synchronized void setCurrentKeyId(int keyId) {
+    currentId = keyId;
+  }
+
   /**
    * For subclasses externalizing the storage, for example Zookeeper
    * based implementations
@@ -282,8 +309,8 @@ extends AbstractDelegationTokenIdentifier>
       return;
     }
     byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
-    if (identifier.getSequenceNumber() > delegationTokenSequenceNumber) {
-      delegationTokenSequenceNumber = identifier.getSequenceNumber();
+    if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
+      setDelegationTokenSeqNum(identifier.getSequenceNumber());
     }
     if (getTokenInfo(identifier) == null) {
       currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
@@ -303,7 +330,7 @@ extends AbstractDelegationTokenIdentifier>
     /* Create a new currentKey with an estimated expiry date. */
     int newCurrentId;
     synchronized (this) {
-      newCurrentId = currentId+1;
+      newCurrentId = incrementCurrentKeyId();
     }
     DelegationKey newKey = new DelegationKey(newCurrentId, System
         .currentTimeMillis()
@@ -311,7 +338,6 @@ extends AbstractDelegationTokenIdentifier>
     //Log must be invoked outside the lock on 'this'
     logUpdateMasterKey(newKey);
     synchronized (this) {
-      currentId = newKey.getKeyId();
       currentKey = newKey;
       storeDelegationKey(currentKey);
     }
@@ -358,9 +384,9 @@ extends AbstractDelegationTokenIdentifier>
     sequenceNum = incrementDelegationTokenSeqNum();
     identifier.setIssueDate(now);
     identifier.setMaxDate(now + tokenMaxLifetime);
-    identifier.setMasterKeyId(currentId);
+    identifier.setMasterKeyId(currentKey.getKeyId());
     identifier.setSequenceNumber(sequenceNum);
-    LOG.info("Creating password for identifier: " + identifier);
+    LOG.info("Creating password for identifier: [" + MD5Hash.digest(identifier.getBytes()) + ", " + currentKey.getKeyId() + "]");
     byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
     DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
         + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));

+ 63 - 23
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -56,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,6 +104,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
 
   private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
   private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
+  private static final String ZK_DTSM_KEYID_ROOT = "ZKDTSMKeyIdRoot";
   private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
   private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
 
@@ -119,7 +120,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
 
   private final boolean isExternalClient;
   private final CuratorFramework zkClient;
-  private SharedCount seqCounter;
+  private SharedCount delTokSeqCounter;
+  private SharedCount keyIdSeqCounter;
   private PathChildrenCache keyCache;
   private PathChildrenCache tokenCache;
   private ExecutorService listenerThreadPool;
@@ -276,7 +278,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   }
 
   @Override
-  public synchronized void startThreads() throws IOException {
+  public void startThreads() throws IOException {
     if (!isExternalClient) {
       try {
         zkClient.start();
@@ -285,13 +287,21 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       }
     }
     try {
-      seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
-      if (seqCounter != null) {
-        seqCounter.start();
+      delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
+      if (delTokSeqCounter != null) {
+        delTokSeqCounter.start();
       }
     } catch (Exception e) {
       throw new IOException("Could not start Sequence Counter", e);
     }
+    try {
+      keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0);
+      if (keyIdSeqCounter != null) {
+        keyIdSeqCounter.start();
+      }
+    } catch (Exception e) {
+      throw new IOException("Could not start KeyId Counter", e);
+    }
     try {
       createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
       createPersistentNode(ZK_DTSM_TOKENS_ROOT);
@@ -402,13 +412,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   }
 
   @Override
-  public synchronized void stopThreads() {
+  public void stopThreads() {
     try {
       if (!isExternalClient && (zkClient != null)) {
         zkClient.close();
       }
-      if (seqCounter != null) {
-        seqCounter.close();
+      if (delTokSeqCounter != null) {
+        delTokSeqCounter.close();
+      }
+      if (keyIdSeqCounter != null) {
+        keyIdSeqCounter.close();
       }
       if (keyCache != null) {
         keyCache.close();
@@ -434,30 +447,46 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   }
 
   @Override
-  protected synchronized int getDelegationTokenSeqNum() {
-    return seqCounter.getCount();
+  protected int getDelegationTokenSeqNum() {
+    return delTokSeqCounter.getCount();
   }
 
   @Override
-  protected synchronized int incrementDelegationTokenSeqNum() {
+  protected int incrementDelegationTokenSeqNum() {
     try {
-      while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
+      while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
       }
     } catch (Exception e) {
       throw new RuntimeException("Could not increment shared counter !!", e);
     }
-    return seqCounter.getCount();
+    return delTokSeqCounter.getCount();
   }
 
   @Override
-  protected synchronized void setDelegationTokenSeqNum(int seqNum) {
+  protected void setDelegationTokenSeqNum(int seqNum) {
     try {
-      seqCounter.setCount(seqNum);
+      delTokSeqCounter.setCount(seqNum);
     } catch (Exception e) {
       throw new RuntimeException("Could not set shared counter !!", e);
     }
   }
 
+  @Override
+  protected int getCurrentKeyId() {
+    return keyIdSeqCounter.getCount();
+  }
+
+  @Override
+  protected int incrementCurrentKeyId() {
+    try {
+      while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Could not increment shared keyId counter !!", e);
+    }
+    return keyIdSeqCounter.getCount();
+  }
+
   @Override
   protected DelegationKey getDelegationKey(int keyId) {
     // First check if its I already have this key
@@ -518,6 +547,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
 
   private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
       throws IOException {
+    return getTokenInfoFromZK(ident, false);
+  }
+
+  private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
+      boolean quiet) throws IOException {
     String nodePath =
         getNodePath(ZK_DTSM_TOKENS_ROOT,
             DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
@@ -539,7 +573,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
         return tokenInfo;
       }
     } catch (KeeperException.NoNodeException e) {
-      LOG.error("No node in path [" + nodePath + "]");
+      if (!quiet) {
+        LOG.error("No node in path [" + nodePath + "]");
+      }
     } catch (Exception ex) {
       throw new IOException(ex);
     }
@@ -604,7 +640,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     }
     try {
       if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
-        zkClient.delete().forPath(nodeRemovePath);
+        while(zkClient.checkExists().forPath(nodeRemovePath) != null){
+          zkClient.delete().guaranteed().forPath(nodeRemovePath);
+        }
       } else {
         LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
       }
@@ -633,10 +671,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
             + ident.getSequenceNumber());
     try {
       if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+        addOrUpdateToken(ident, tokenInfo, true);
+      } else {
         addOrUpdateToken(ident, tokenInfo, false);
         LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
-      } else {
-        addOrUpdateToken(ident, tokenInfo, true);
       }
     } catch (Exception e) {
       throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
@@ -656,9 +694,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     }
     try {
       if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
-        LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
+        while(zkClient.checkExists().forPath(nodeRemovePath) != null){
+          zkClient.delete().guaranteed().forPath(nodeRemovePath);
+        }
       } else {
-        zkClient.delete().forPath(nodeRemovePath);
+        LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
       }
     } catch (Exception e) {
       throw new RuntimeException(
@@ -682,7 +722,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       tokenOut.writeInt(info.getPassword().length);
       tokenOut.write(info.getPassword());
       if (LOG.isDebugEnabled()) {
-        LOG.debug((isUpdate ? "Storing " : "Updating ")
+        LOG.debug((isUpdate ? "Updating " : "Storing ")
             + "ZKDTSMDelegationToken_" +
             ident.getSequenceNumber());
       }

+ 104 - 27
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java

@@ -22,50 +22,127 @@ import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+
+import static org.junit.Assert.fail;
+
 import org.junit.Test;
 
 public class TestZKDelegationTokenSecretManager {
 
   private static final long DAY_IN_SECS = 86400;
 
+  private TestingServer zkServer;
+
+  @Before
+  public void setup() throws Exception {
+    zkServer = new TestingServer();
+    zkServer.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (zkServer != null) {
+      zkServer.close();
+    }
+  }
+
+  protected Configuration getSecretConf(String connectString) {
+   Configuration conf = new Configuration();
+   conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
+   conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
+   conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
+   conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+   conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+   conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+   conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+   conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+   return conf;
+  }
+
   @SuppressWarnings("unchecked")
   @Test
-  public void testZKDelTokSecretManager() throws Exception {
-    TestingServer zkServer = new TestingServer();
+  public void testMultiNodeOperations() throws Exception {
     DelegationTokenManager tm1, tm2 = null;
-    zkServer.start();
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    tm1 = new DelegationTokenManager(conf, new Text("bla"));
+    tm1.init();
+    tm2 = new DelegationTokenManager(conf, new Text("bla"));
+    tm2.init();
+
+    Token<DelegationTokenIdentifier> token =
+        (Token<DelegationTokenIdentifier>) tm1.createToken(
+            UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token);
+    tm2.verifyToken(token);
+    tm2.renewToken(token, "foo");
+    tm1.verifyToken(token);
+    tm1.cancelToken(token, "foo");
     try {
-      String connectString = zkServer.getConnectString();
-      Configuration conf = new Configuration();
-      conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
-      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
-      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
-      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
-      conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
-      conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
-      conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
-      conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
-      tm1 = new DelegationTokenManager(conf, new Text("foo"));
-      tm1.init();
-      tm2 = new DelegationTokenManager(conf, new Text("foo"));
-      tm2.init();
-
-      Token<DelegationTokenIdentifier> token =
-          (Token<DelegationTokenIdentifier>) tm1.createToken(
-              UserGroupInformation.getCurrentUser(), "foo");
-      Assert.assertNotNull(token);
       tm2.verifyToken(token);
+      fail("Expected InvalidToken");
+    } catch (SecretManager.InvalidToken it) {
+      // Ignore
+    }
 
-      token = (Token<DelegationTokenIdentifier>) tm2.createToken(
-          UserGroupInformation.getCurrentUser(), "bar");
-      Assert.assertNotNull(token);
+    token = (Token<DelegationTokenIdentifier>) tm2.createToken(
+        UserGroupInformation.getCurrentUser(), "bar");
+    Assert.assertNotNull(token);
+    tm1.verifyToken(token);
+    tm1.renewToken(token, "bar");
+    tm2.verifyToken(token);
+    tm2.cancelToken(token, "bar");
+    try {
       tm1.verifyToken(token);
-    } finally {
-      zkServer.close();
+      fail("Expected InvalidToken");
+    } catch (SecretManager.InvalidToken it) {
+      // Ignore
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRenewTokenSingleManager() throws Exception {
+    DelegationTokenManager tm1 = null;
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    tm1 = new DelegationTokenManager(conf, new Text("foo"));
+    tm1.init();
+
+    Token<DelegationTokenIdentifier> token =
+        (Token<DelegationTokenIdentifier>)
+        tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token);
+    tm1.renewToken(token, "foo");
+    tm1.verifyToken(token);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCancelTokenSingleManager() throws Exception {
+    DelegationTokenManager tm1 = null;
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    tm1 = new DelegationTokenManager(conf, new Text("foo"));
+    tm1.init();
+
+    Token<DelegationTokenIdentifier> token =
+        (Token<DelegationTokenIdentifier>)
+        tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token);
+    tm1.cancelToken(token, "foo");
+    try {
+      tm1.verifyToken(token);
+      fail("Expected InvalidToken");
+    } catch (SecretManager.InvalidToken it) {
+      it.printStackTrace();
     }
   }
 }