浏览代码

Merge r1609845 through r1612268 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612270 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 11 年之前
父节点
当前提交
ee3a32784f
共有 47 个文件被更改,包括 1382 次插入512 次删除
  1. 6 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 174 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java
  3. 46 19
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java
  4. 15 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java
  5. 25 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java
  6. 53 21
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java
  7. 4 4
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
  8. 14 9
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
  9. 100 26
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java
  10. 0 177
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java
  11. 13 1
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
  12. 12 4
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
  13. 20 7
      hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
  14. 4 4
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java
  15. 4 2
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
  16. 5 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
  18. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  19. 5 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  20. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  21. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
  22. 20 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
  23. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
  24. 12 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 80 37
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
  26. 27 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
  27. 42 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java
  28. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
  29. 16 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
  30. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
  31. 185 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
  32. 85 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java
  33. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java
  34. 8 0
      hadoop-yarn-project/CHANGES.txt
  35. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
  36. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  37. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  38. 0 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  39. 0 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  40. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
  41. 0 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  42. 178 54
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
  43. 75 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
  44. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  45. 8 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  46. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
  47. 68 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

+ 6 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -183,6 +183,9 @@ Trunk (Unreleased)
     HADOOP-10842. CryptoExtension generateEncryptedKey method should 
     receive the key name. (asuresh via tucu)
 
+    HADOOP-10750. KMSKeyProviderCache should be in hadoop-common. 
+    (asuresh via tucu)
+
   BUG FIXES
 
     HADOOP-9451. Fault single-layer config if node group topology is enabled.
@@ -432,6 +435,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10610. Upgrade S3n s3.fs.buffer.dir to support multi directories.
     (Ted Malaska via atm)
 
+    HADOOP-10817. ProxyUsers configuration should support configurable 
+    prefixes. (tucu)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 174 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A <code>KeyProviderExtension</code> implementation providing a short lived
+ * cache for <code>KeyVersions</code> and <code>Metadata</code>to avoid burst
+ * of requests to hit the underlying <code>KeyProvider</code>.
+ */
+public class CachingKeyProvider extends
+    KeyProviderExtension<CachingKeyProvider.CacheExtension> {
+
+  static class CacheExtension implements KeyProviderExtension.Extension {
+    private final KeyProvider provider;
+    private LoadingCache<String, KeyVersion> keyVersionCache;
+    private LoadingCache<String, KeyVersion> currentKeyCache;
+    private LoadingCache<String, Metadata> keyMetadataCache;
+
+    CacheExtension(KeyProvider prov, long keyTimeoutMillis,
+        long currKeyTimeoutMillis) {
+      this.provider = prov;
+      keyVersionCache =
+          CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis,
+              TimeUnit.MILLISECONDS)
+              .build(new CacheLoader<String, KeyVersion>() {
+                @Override
+                public KeyVersion load(String key) throws Exception {
+                  KeyVersion kv = provider.getKeyVersion(key);
+                  if (kv == null) {
+                    throw new KeyNotFoundException();
+                  }
+                  return kv;
+                }
+              });
+      keyMetadataCache =
+          CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis,
+              TimeUnit.MILLISECONDS)
+              .build(new CacheLoader<String, Metadata>() {
+                @Override
+                public Metadata load(String key) throws Exception {
+                  Metadata meta = provider.getMetadata(key);
+                  if (meta == null) {
+                    throw new KeyNotFoundException();
+                  }
+                  return meta;
+                }
+              });
+      currentKeyCache =
+          CacheBuilder.newBuilder().expireAfterWrite(currKeyTimeoutMillis,
+          TimeUnit.MILLISECONDS)
+          .build(new CacheLoader<String, KeyVersion>() {
+            @Override
+            public KeyVersion load(String key) throws Exception {
+              KeyVersion kv = provider.getCurrentKey(key);
+              if (kv == null) {
+                throw new KeyNotFoundException();
+              }
+              return kv;
+            }
+          });
+    }
+  }
+
+  @SuppressWarnings("serial")
+  private static class KeyNotFoundException extends Exception { }
+
+  public CachingKeyProvider(KeyProvider keyProvider, long keyTimeoutMillis,
+      long currKeyTimeoutMillis) {
+    super(keyProvider, new CacheExtension(keyProvider, keyTimeoutMillis,
+        currKeyTimeoutMillis));
+  }
+
+  @Override
+  public KeyVersion getCurrentKey(String name) throws IOException {
+    try {
+      return getExtension().currentKeyCache.get(name);
+    } catch (ExecutionException ex) {
+      Throwable cause = ex.getCause();
+      if (cause instanceof KeyNotFoundException) {
+        return null;
+      } else if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new IOException(cause);
+      }
+    }
+  }
+
+  @Override
+  public KeyVersion getKeyVersion(String versionName)
+      throws IOException {
+    try {
+      return getExtension().keyVersionCache.get(versionName);
+    } catch (ExecutionException ex) {
+      Throwable cause = ex.getCause();
+      if (cause instanceof KeyNotFoundException) {
+        return null;
+      } else if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new IOException(cause);
+      }
+    }
+  }
+
+  @Override
+  public void deleteKey(String name) throws IOException {
+    getKeyProvider().deleteKey(name);
+    getExtension().currentKeyCache.invalidate(name);
+    getExtension().keyMetadataCache.invalidate(name);
+    // invalidating all key versions as we don't know
+    // which ones belonged to the deleted key
+    getExtension().keyVersionCache.invalidateAll();
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name, byte[] material)
+      throws IOException {
+    KeyVersion key = getKeyProvider().rollNewVersion(name, material);
+    getExtension().currentKeyCache.invalidate(name);
+    getExtension().keyMetadataCache.invalidate(name);
+    return key;
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name)
+      throws NoSuchAlgorithmException, IOException {
+    KeyVersion key = getKeyProvider().rollNewVersion(name);
+    getExtension().currentKeyCache.invalidate(name);
+    getExtension().keyMetadataCache.invalidate(name);
+    return key;
+  }
+
+  @Override
+  public Metadata getMetadata(String name) throws IOException {
+    try {
+      return getExtension().keyMetadataCache.get(name);
+    } catch (ExecutionException ex) {
+      Throwable cause = ex.getCause();
+      if (cause instanceof KeyNotFoundException) {
+        return null;
+      } else if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new IOException(cause);
+      }
+    }
+  }
+
+}

+ 46 - 19
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java

@@ -24,37 +24,64 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.MachineList;
 
 import com.google.common.annotations.VisibleForTesting;
 
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
 public class DefaultImpersonationProvider implements ImpersonationProvider {
   private static final String CONF_HOSTS = ".hosts";
   private static final String CONF_USERS = ".users";
   private static final String CONF_GROUPS = ".groups";
-  private static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser.";
-  private static final String CONF_HADOOP_PROXYUSER_RE = "hadoop\\.proxyuser\\.";
-  private static final String CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS = 
-      CONF_HADOOP_PROXYUSER_RE+"[^.]*(" + Pattern.quote(CONF_USERS) +
-      "|" + Pattern.quote(CONF_GROUPS) + ")";
-  private static final String CONF_HADOOP_PROXYUSER_RE_HOSTS = 
-      CONF_HADOOP_PROXYUSER_RE+"[^.]*"+ Pattern.quote(CONF_HOSTS);
   // acl and list of hosts per proxyuser
   private Map<String, AccessControlList> proxyUserAcl = 
     new HashMap<String, AccessControlList>();
-  private static Map<String, MachineList> proxyHosts = 
+  private Map<String, MachineList> proxyHosts =
     new HashMap<String, MachineList>();
   private Configuration conf;
 
+
+  private static DefaultImpersonationProvider testProvider;
+
+  public static synchronized DefaultImpersonationProvider getTestProvider() {
+    if (testProvider == null) {
+      testProvider = new DefaultImpersonationProvider();
+      testProvider.setConf(new Configuration());
+      testProvider.init(ProxyUsers.CONF_HADOOP_PROXYUSER);
+    }
+    return testProvider;
+  }
+
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+  }
+
+  private String configPrefix;
 
-    // get list of users and groups per proxyuser
+  @Override
+  public void init(String configurationPrefix) {
+    configPrefix = configurationPrefix +
+        (configurationPrefix.endsWith(".") ? "" : ".");
+    
+    // constructing regex to match the following patterns:
+    //   $configPrefix.[ANY].users
+    //   $configPrefix.[ANY].groups
+    //   $configPrefix.[ANY].hosts
+    //
+    String prefixRegEx = configPrefix.replace(".", "\\.");
+    String usersGroupsRegEx = prefixRegEx + "[^.]*(" +
+        Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + ")";
+    String hostsRegEx = prefixRegEx + "[^.]*" + Pattern.quote(CONF_HOSTS);
+
+  // get list of users and groups per proxyuser
     Map<String,String> allMatchKeys = 
-        conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS); 
+        conf.getValByRegex(usersGroupsRegEx);
     for(Entry<String, String> entry : allMatchKeys.entrySet()) {  
       String aclKey = getAclKey(entry.getKey());
       if (!proxyUserAcl.containsKey(aclKey)) {
@@ -65,7 +92,7 @@ public class DefaultImpersonationProvider implements ImpersonationProvider {
     }
 
     // get hosts per proxyuser
-    allMatchKeys = conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_HOSTS);
+    allMatchKeys = conf.getValByRegex(hostsRegEx);
     for(Entry<String, String> entry : allMatchKeys.entrySet()) {
       proxyHosts.put(entry.getKey(),
           new MachineList(entry.getValue()));
@@ -86,8 +113,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider {
       return;
     }
     
-    AccessControlList acl = proxyUserAcl.get(
-        CONF_HADOOP_PROXYUSER+realUser.getShortUserName());
+    AccessControlList acl = proxyUserAcl.get(configPrefix +
+        realUser.getShortUserName());
     if (acl == null || !acl.isUserAllowed(user)) {
       throw new AuthorizationException("User: " + realUser.getUserName()
           + " is not allowed to impersonate " + user.getUserName());
@@ -116,8 +143,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider {
    * @param userName name of the superuser
    * @return configuration key for superuser usergroups
    */
-  public static String getProxySuperuserUserConfKey(String userName) {
-    return CONF_HADOOP_PROXYUSER+userName+CONF_USERS;
+  public String getProxySuperuserUserConfKey(String userName) {
+    return configPrefix + userName + CONF_USERS;
   }
 
   /**
@@ -126,8 +153,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider {
    * @param userName name of the superuser
    * @return configuration key for superuser groups
    */
-  public static String getProxySuperuserGroupConfKey(String userName) {
-    return CONF_HADOOP_PROXYUSER+userName+CONF_GROUPS;
+  public String getProxySuperuserGroupConfKey(String userName) {
+    return configPrefix + userName + CONF_GROUPS;
   }
 
   /**
@@ -136,8 +163,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider {
    * @param userName name of the superuser
    * @return configuration key for superuser ip-addresses
    */
-  public static String getProxySuperuserIpConfKey(String userName) {
-    return CONF_HADOOP_PROXYUSER+userName+CONF_HOSTS;
+  public String getProxySuperuserIpConfKey(String userName) {
+    return configPrefix + userName + CONF_HOSTS;
   }
 
   @VisibleForTesting

+ 15 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java

@@ -18,10 +18,25 @@
 
 package org.apache.hadoop.security.authorize;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.security.UserGroupInformation;
 
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
 public interface ImpersonationProvider  extends Configurable {
+
+
+  /**
+   * Specifies the configuration prefix for the proxy user properties and
+   * initializes the provider.
+   *
+   * @param configurationPrefix the configuration prefix for the proxy user
+   * properties
+   */
+  public void init(String configurationPrefix);
+
   /**
    * Authorize the superuser which is doing doAs
    * 

+ 25 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.security.authorize;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -26,9 +28,12 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
+@InterfaceStability.Unstable
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "HBase", "Hive"})
 public class ProxyUsers {
 
+  public static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser";
+
   private static volatile ImpersonationProvider sip ;
 
   /**
@@ -54,15 +59,31 @@ public class ProxyUsers {
   }
 
   /**
-   * refresh configuration
-   * @param conf
+   * Refreshes configuration using the specified Proxy user prefix for
+   * properties.
+   *
+   * @param conf configuration
+   * @param proxyUserPrefix proxy user configuration prefix
    */
-  public static void refreshSuperUserGroupsConfiguration(Configuration conf) { 
+  public static void refreshSuperUserGroupsConfiguration(Configuration conf,
+      String proxyUserPrefix) {
+    Preconditions.checkArgument(proxyUserPrefix != null && 
+        !proxyUserPrefix.isEmpty(), "prefix cannot be NULL or empty");
     // sip is volatile. Any assignment to it as well as the object's state
     // will be visible to all the other threads. 
-    sip = getInstance(conf);
+    ImpersonationProvider ip = getInstance(conf);
+    ip.init(proxyUserPrefix);
+    sip = ip;
     ProxyServers.refresh(conf);
   }
+
+  /**
+   * Refreshes configuration using the default Proxy user prefix for properties.
+   * @param conf configuration
+   */
+  public static void refreshSuperUserGroupsConfiguration(Configuration conf) {
+    refreshSuperUserGroupsConfiguration(conf, CONF_HADOOP_PROXYUSER);
+  }
   
   /**
    * Authorize the superuser which is doing doAs

+ 53 - 21
hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java → hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java

@@ -15,17 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.crypto.key.kms.server;
+package org.apache.hadoop.crypto.key;
+
+import java.util.Date;
 
-import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Date;
-
-public class TestKMSCacheKeyProvider {
+public class TestCachingKeyProvider {
 
   @Test
   public void testCurrentKey() throws Exception {
@@ -33,7 +32,7 @@ public class TestKMSCacheKeyProvider {
     KeyProvider mockProv = Mockito.mock(KeyProvider.class);
     Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
     Mockito.when(mockProv.getCurrentKey(Mockito.eq("k2"))).thenReturn(null);
-    KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+    KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
 
     // asserting caching
     Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
@@ -45,7 +44,7 @@ public class TestKMSCacheKeyProvider {
     Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1"));
 
     // asserting no caching when key is not known
-    cache = new KMSCacheKeyProvider(mockProv, 100);
+    cache = new CachingKeyProvider(mockProv, 100, 100);
     Assert.assertEquals(null, cache.getCurrentKey("k2"));
     Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k2"));
     Assert.assertEquals(null, cache.getCurrentKey("k2"));
@@ -56,25 +55,56 @@ public class TestKMSCacheKeyProvider {
   public void testKeyVersion() throws Exception {
     KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
     KeyProvider mockProv = Mockito.mock(KeyProvider.class);
-    Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey);
+    Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0")))
+        .thenReturn(mockKey);
     Mockito.when(mockProv.getKeyVersion(Mockito.eq("k2@0"))).thenReturn(null);
-    KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+    KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
 
     // asserting caching
     Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
-    Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+    Mockito.verify(mockProv, Mockito.times(1))
+        .getKeyVersion(Mockito.eq("k1@0"));
     Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
-    Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+    Mockito.verify(mockProv, Mockito.times(1))
+        .getKeyVersion(Mockito.eq("k1@0"));
     Thread.sleep(200);
     Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
-    Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0"));
+    Mockito.verify(mockProv, Mockito.times(2))
+        .getKeyVersion(Mockito.eq("k1@0"));
 
     // asserting no caching when key is not known
-    cache = new KMSCacheKeyProvider(mockProv, 100);
+    cache = new CachingKeyProvider(mockProv, 100, 100);
     Assert.assertEquals(null, cache.getKeyVersion("k2@0"));
-    Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k2@0"));
+    Mockito.verify(mockProv, Mockito.times(1))
+        .getKeyVersion(Mockito.eq("k2@0"));
     Assert.assertEquals(null, cache.getKeyVersion("k2@0"));
-    Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k2@0"));
+    Mockito.verify(mockProv, Mockito.times(2))
+        .getKeyVersion(Mockito.eq("k2@0"));
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    KeyProvider.Metadata mockMeta = Mockito.mock(KeyProvider.Metadata.class);
+    KeyProvider mockProv = Mockito.mock(KeyProvider.class);
+    Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(mockMeta);
+    Mockito.when(mockProv.getMetadata(Mockito.eq("k2"))).thenReturn(null);
+    KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
+
+    // asserting caching
+    Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+    Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1"));
+    Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+    Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1"));
+    Thread.sleep(200);
+    Assert.assertEquals(mockMeta, cache.getMetadata("k1"));
+    Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k1"));
+
+    // asserting no caching when key is not known
+    cache = new CachingKeyProvider(mockProv, 100, 100);
+    Assert.assertEquals(null, cache.getMetadata("k2"));
+    Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k2"));
+    Assert.assertEquals(null, cache.getMetadata("k2"));
+    Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k2"));
   }
 
   @Test
@@ -82,7 +112,7 @@ public class TestKMSCacheKeyProvider {
     KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
     KeyProvider mockProv = Mockito.mock(KeyProvider.class);
     Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
-    KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+    KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
     Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
     Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));
     cache.rollNewVersion("k1");
@@ -100,21 +130,23 @@ public class TestKMSCacheKeyProvider {
     KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
     KeyProvider mockProv = Mockito.mock(KeyProvider.class);
     Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
-    Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey);
+    Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0")))
+        .thenReturn(mockKey);
     Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(
         new KMSClientProvider.KMSMetadata("c", 0, "l", null, new Date(), 1));
-    KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100);
+    KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
     Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
     Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));
     Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
-    Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0"));
+    Mockito.verify(mockProv, Mockito.times(1))
+        .getKeyVersion(Mockito.eq("k1@0"));
     cache.deleteKey("k1");
 
     // asserting the cache is purged
     Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
     Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1"));
     Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0"));
-    Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0"));
+    Mockito.verify(mockProv, Mockito.times(2))
+        .getKeyVersion(Mockito.eq("k1@0"));
   }
-
 }

+ 4 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java

@@ -327,8 +327,8 @@ public class MiniRPCBenchmark {
     String shortUserName =
       UserGroupInformation.createRemoteUser(user).getShortUserName();
     try {
-      conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(shortUserName),
-          GROUP_NAME_1);
+      conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+              getProxySuperuserGroupConfKey(shortUserName), GROUP_NAME_1);
       configureSuperUserIPAddresses(conf, shortUserName);
       // start the server
       miniServer = new MiniServer(conf, user, keytabFile);
@@ -411,7 +411,7 @@ public class MiniRPCBenchmark {
     }
     builder.append("127.0.1.1,");
     builder.append(InetAddress.getLocalHost().getCanonicalHostName());
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
-        builder.toString());
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(superUserShortName), builder.toString());
   }
 }

+ 14 - 9
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java

@@ -101,7 +101,8 @@ public class TestDoAsEffectiveUser {
     builder.append("127.0.1.1,");
     builder.append(InetAddress.getLocalHost().getCanonicalHostName());
     LOG.info("Local Ip addresses: "+builder.toString());
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(superUserShortName),
         builder.toString());
   }
   
@@ -181,8 +182,8 @@ public class TestDoAsEffectiveUser {
   @Test(timeout=4000)
   public void testRealUserSetup() throws IOException {
     final Configuration conf = new Configuration();
-    conf.setStrings(DefaultImpersonationProvider
-        .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
     configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
     Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -214,7 +215,8 @@ public class TestDoAsEffectiveUser {
   public void testRealUserAuthorizationSuccess() throws IOException {
     final Configuration conf = new Configuration();
     configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group1");
     Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -248,9 +250,11 @@ public class TestDoAsEffectiveUser {
   @Test
   public void testRealUserIPAuthorizationFailure() throws IOException {
     final Configuration conf = new Configuration();
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME),
         "20.20.20.20"); //Authorized IP address
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group1");
     Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -293,8 +297,8 @@ public class TestDoAsEffectiveUser {
   @Test
   public void testRealUserIPNotSpecified() throws IOException {
     final Configuration conf = new Configuration();
-    conf.setStrings(DefaultImpersonationProvider
-        .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
     Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
         .setNumHandlers(2).setVerbose(false).build();
@@ -377,7 +381,8 @@ public class TestDoAsEffectiveUser {
   public void testRealUserGroupAuthorizationFailure() throws IOException {
     final Configuration conf = new Configuration();
     configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group3");
     Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)

+ 100 - 26
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java

@@ -111,10 +111,12 @@ public class TestProxyUsers {
       groupMappingClassName);
 
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_NAME),
         StringUtils.join(",", Arrays.asList(NETGROUP_NAMES)));
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_NAME),
         PROXY_IP);
     
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -135,10 +137,12 @@ public class TestProxyUsers {
   public void testProxyUsers() throws Exception {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserGroupConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       PROXY_IP);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -168,10 +172,12 @@ public class TestProxyUsers {
   public void testProxyUsersWithUserConf() throws Exception {
     Configuration conf = new Configuration();
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserUserConfKey(REAL_USER_NAME),
         StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_NAME),
         PROXY_IP);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -202,10 +208,12 @@ public class TestProxyUsers {
   public void testWildcardGroup() {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserGroupConfKey(REAL_USER_NAME),
       "*");
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       PROXY_IP);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -236,10 +244,12 @@ public class TestProxyUsers {
   public void testWildcardUser() {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserUserConfKey(REAL_USER_NAME),
       "*");
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       PROXY_IP);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -270,10 +280,12 @@ public class TestProxyUsers {
   public void testWildcardIP() {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserGroupConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       "*");
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -301,10 +313,12 @@ public class TestProxyUsers {
   public void testIPRange() {
     Configuration conf = new Configuration();
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_NAME),
         "*");
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_NAME),
         PROXY_IP_RANGE);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
 
@@ -324,16 +338,19 @@ public class TestProxyUsers {
   public void testWithDuplicateProxyGroups() throws Exception {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserGroupConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(GROUP_NAMES,GROUP_NAMES)));
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       PROXY_IP);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
     
     Collection<String> groupsToBeProxied = 
         ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get(
-        DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME));
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_NAME));
     
     assertEquals (1,groupsToBeProxied.size());
   }
@@ -342,16 +359,19 @@ public class TestProxyUsers {
   public void testWithDuplicateProxyHosts() throws Exception {
     Configuration conf = new Configuration();
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider()
+          .getProxySuperuserGroupConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(PROXY_IP,PROXY_IP)));
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
     
     Collection<String> hosts = 
         ProxyUsers.getDefaultImpersonationProvider().getProxyHosts().get(
-        DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME));
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_NAME));
     
     assertEquals (1,hosts.size());
   }
@@ -391,26 +411,73 @@ public class TestProxyUsers {
   public void testWithProxyGroupsAndUsersWithSpaces() throws Exception {
     Configuration conf = new Configuration();
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserUserConfKey(REAL_USER_NAME),
         StringUtils.join(",", Arrays.asList(PROXY_USER_NAME + " ",AUTHORIZED_PROXY_USER_NAME, "ONEMORE")));
 
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserGroupConfKey(REAL_USER_NAME),
       StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
     
     conf.set(
-      DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+      DefaultImpersonationProvider.getTestProvider().
+          getProxySuperuserIpConfKey(REAL_USER_NAME),
       PROXY_IP);
     
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
     
     Collection<String> groupsToBeProxied = 
         ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get(
-        DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME));
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_NAME));
     
     assertEquals (GROUP_NAMES.length, groupsToBeProxied.size());
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testProxyUsersWithNullPrefix() throws Exception {
+    ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false), 
+        null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProxyUsersWithEmptyPrefix() throws Exception {
+    ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false), 
+        "");
+  }
+
+  @Test
+  public void testProxyUsersWithCustomPrefix() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.set("x." + REAL_USER_NAME + ".users",
+        StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME)));
+    conf.set("x." + REAL_USER_NAME+ ".hosts", PROXY_IP);
+    ProxyUsers.refreshSuperUserGroupsConfiguration(conf, "x");
+
+
+    // First try proxying a user that's allowed
+    UserGroupInformation realUserUgi = UserGroupInformation
+        .createRemoteUser(REAL_USER_NAME);
+    UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
+        AUTHORIZED_PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
+
+    // From good IP
+    assertAuthorized(proxyUserUgi, "1.2.3.4");
+    // From bad IP
+    assertNotAuthorized(proxyUserUgi, "1.2.3.5");
+
+    // Now try proxying a user that's not allowed
+    realUserUgi = UserGroupInformation.createRemoteUser(REAL_USER_NAME);
+    proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
+        PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
+
+    // From good IP
+    assertNotAuthorized(proxyUserUgi, "1.2.3.4");
+    // From bad IP
+    assertNotAuthorized(proxyUserUgi, "1.2.3.5");
+  }
+
 
   private void assertNotAuthorized(UserGroupInformation proxyUgi, String host) {
     try {
@@ -430,6 +497,11 @@ public class TestProxyUsers {
   }
 
   static class TestDummyImpersonationProvider implements ImpersonationProvider {
+
+    @Override
+    public void init(String configurationPrefix) {
+    }
+
     /**
      * Authorize a user (superuser) to impersonate another user (user1) if the 
      * superuser belongs to the group "sudo_user1" .
@@ -460,11 +532,13 @@ public class TestProxyUsers {
   public static void loadTest(String ipString, int testRange) {
     Configuration conf = new Configuration();
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER_NAME),
         StringUtils.join(",", Arrays.asList(GROUP_NAMES)));
 
     conf.set(
-        DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME),
+        DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(REAL_USER_NAME),
         ipString
         );
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);

+ 0 - 177
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java

@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.crypto.key.kms.server;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.hadoop.crypto.key.KeyProvider;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A <code>KeyProvider</code> proxy implementation providing a short lived
- * cache for <code>KeyVersions</code> to avoid burst of requests to hit the
- * underlying <code>KeyProvider</code>.
- */
-public class KMSCacheKeyProvider extends KeyProvider {
-  private final KeyProvider provider;
-  private LoadingCache<String, KeyVersion> keyVersionCache;
-  private LoadingCache<String, KeyVersion> currentKeyCache;
-
-  private static class KeyNotFoundException extends Exception {
-    private static final long serialVersionUID = 1L;
-  }
-
-  public KMSCacheKeyProvider(KeyProvider prov, long timeoutMillis) {
-    this.provider =  prov;
-    keyVersionCache = CacheBuilder.newBuilder().expireAfterAccess(timeoutMillis,
-        TimeUnit.MILLISECONDS).build(new CacheLoader<String, KeyVersion>() {
-      @Override
-      public KeyVersion load(String key) throws Exception {
-        KeyVersion kv = provider.getKeyVersion(key);
-        if (kv == null) {
-          throw new KeyNotFoundException();
-        }
-        return kv;
-      }
-    });
-    // for current key we don't want to go stale for more than 1 sec
-    currentKeyCache = CacheBuilder.newBuilder().expireAfterWrite(1000,
-        TimeUnit.MILLISECONDS).build(new CacheLoader<String, KeyVersion>() {
-      @Override
-      public KeyVersion load(String key) throws Exception {
-        KeyVersion kv =  provider.getCurrentKey(key);
-        if (kv == null) {
-          throw new KeyNotFoundException();
-        }
-        return kv;
-      }
-    });
-  }
-
-  @Override
-  public KeyVersion getCurrentKey(String name) throws IOException {
-    try {
-      return currentKeyCache.get(name);
-    } catch (ExecutionException ex) {
-      Throwable cause = ex.getCause();
-      if (cause instanceof KeyNotFoundException) {
-        return null;
-      } else if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else {
-        throw new IOException(cause);
-      }
-    }
-  }
-
-  @Override
-  public KeyVersion getKeyVersion(String versionName)
-      throws IOException {
-    try {
-      return keyVersionCache.get(versionName);
-    } catch (ExecutionException ex) {
-      Throwable cause = ex.getCause();
-      if (cause instanceof KeyNotFoundException) {
-        return null;
-      } else if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else {
-        throw new IOException(cause);
-      }
-    }
-  }
-
-  @Override
-  public List<String> getKeys() throws IOException {
-    return provider.getKeys();
-  }
-
-  @Override
-  public List<KeyVersion> getKeyVersions(String name)
-      throws IOException {
-    return provider.getKeyVersions(name);
-  }
-
-  @Override
-  public Metadata getMetadata(String name) throws IOException {
-    return provider.getMetadata(name);
-  }
-
-  @Override
-  public KeyVersion createKey(String name, byte[] material,
-      Options options) throws IOException {
-    return provider.createKey(name, material, options);
-  }
-
-  @Override
-  public KeyVersion createKey(String name,
-      Options options)
-      throws NoSuchAlgorithmException, IOException {
-    return provider.createKey(name, options);
-  }
-
-  @Override
-  public void deleteKey(String name) throws IOException {
-    provider.deleteKey(name);
-    currentKeyCache.invalidate(name);
-    // invalidating all key versions as we don't know which ones belonged to the
-    // deleted key
-    keyVersionCache.invalidateAll();
-  }
-
-  @Override
-  public KeyVersion rollNewVersion(String name, byte[] material)
-      throws IOException {
-    KeyVersion key = provider.rollNewVersion(name, material);
-    currentKeyCache.invalidate(name);
-    return key;
-  }
-
-  @Override
-  public KeyVersion rollNewVersion(String name)
-      throws NoSuchAlgorithmException, IOException {
-    KeyVersion key = provider.rollNewVersion(name);
-    currentKeyCache.invalidate(name);
-    return key;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    provider.flush();
-  }
-
-  @Override
-  public Metadata[] getKeysMetadata(String ... keyNames)
-      throws IOException {
-    return provider.getKeysMetadata(keyNames);
-  }
-
-  @Override
-  public boolean isTransient() {
-    return provider.isTransient();
-  }
-
-}

+ 13 - 1
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java

@@ -34,9 +34,21 @@ public class KMSConfiguration {
 
   public static final String CONFIG_PREFIX = "hadoop.kms.";
 
+  // Property to Enable/Disable Caching
+  public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
+      "cache.enable";
+  // Timeout for the Key and Metadata Cache
   public static final String KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
       "cache.timeout.ms";
-  public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 1000; // 10 secs
+  // TImeout for the Current Key cache
+  public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
+      "current.key.cache.timeout.ms";
+
+  public static final boolean KEY_CACHE_ENABLE_DEFAULT = true;
+  // 10 mins
+  public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 60 * 1000;
+  // 30 secs
+  public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000;
 
   static Configuration getConfiguration(boolean loadHadoopDefaults,
       String ... resources) {

+ 12 - 4
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java

@@ -22,6 +22,7 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.CachingKeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.http.HttpServer2;
@@ -150,10 +151,17 @@ public class KMSWebApp implements ServletContextListener {
             kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
       }
       keyProvider = providers.get(0);
-      long timeOutMillis =
-          kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY,
-              KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT);
-      keyProvider = new KMSCacheKeyProvider(keyProvider, timeOutMillis);
+      if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
+          KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
+        long keyTimeOutMillis =
+            kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY,
+                KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT);
+        long currKeyTimeOutMillis =
+            kmsConf.getLong(KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_KEY,
+                KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_DEFAULT);
+        keyProvider = new CachingKeyProvider(keyProvider, keyTimeOutMillis,
+            currKeyTimeOutMillis);
+      }
 
       LOG.info("KMS Started");
     } catch (Throwable ex) {

+ 20 - 7
hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm

@@ -72,22 +72,35 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
   KMS caches keys for short period of time to avoid excessive hits to the
   underlying key provider.
 
-  The cache is used with the following 2 methods only, <<<getCurrentKey()>>>
-  and <<<getKeyVersion()>>>.
+  The Cache is enabled by default (can be dissabled by setting the
+  <<<hadoop.kms.cache.enable>>> boolean property to false)
+
+  The cache is used with the following 3 methods only, <<<getCurrentKey()>>>
+  and <<<getKeyVersion()>>> and <<<getMetadata()>>>.
 
   For the <<<getCurrentKey()>>> method, cached entries are kept for a maximum
-  of 1000 millisecond regardless the number of times the key is being access
+  of 30000 millisecond regardless the number of times the key is being access
   (to avoid stale keys to be considered current).
 
   For the <<<getKeyVersion()>>> method, cached entries are kept with a default
-  inactivity timeout of 10000 milliseconds. This time out is configurable via
-  the following property in the <<<etc/hadoop/kms-site.xml>>> configuration
-  file:
+  inactivity timeout of 600000 milliseconds (10 mins). This time out is
+  configurable via the following property in the <<<etc/hadoop/kms-site.xml>>>
+  configuration file:
 
 +---+
+  <property>
+    <name>hadoop.kms.cache.enable</name>
+    <value>true</value>
+  </property>
+
   <property>
     <name>hadoop.kms.cache.timeout.ms</name>
-    <value>10000</value>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <name>hadoop.kms.current.key.cache.timeout.ms</name>
+    <value>30000</value>
   </property>
 +---+
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java

@@ -72,11 +72,11 @@ public class TestReaddir {
   public static void setup() throws Exception {
     String currentUser = System.getProperty("user.name");
     config.set(
-            DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser),
-            "*");
+            DefaultImpersonationProvider.getTestProvider().
+                getProxySuperuserGroupConfKey(currentUser), "*");
     config.set(
-            DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser),
-            "*");
+            DefaultImpersonationProvider.getTestProvider().
+                getProxySuperuserIpConfKey(currentUser), "*");
     ProxyUsers.refreshSuperUserGroupsConfiguration(config);
     cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
     cluster.waitActive();

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java

@@ -312,10 +312,12 @@ public class TestWrites {
         System.getProperty("user.name"));
     String currentUser = System.getProperty("user.name");
     config.set(
-            DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser),
+            DefaultImpersonationProvider.getTestProvider().
+                getProxySuperuserGroupConfKey(currentUser),
             "*");
     config.set(
-            DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser),
+            DefaultImpersonationProvider.getTestProvider().
+                getProxySuperuserIpConfKey(currentUser),
             "*");
     ProxyUsers.refreshSuperUserGroupsConfiguration(config);
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -321,6 +321,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for
     deletion. (szetszwo)
 
+    HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
+    will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -612,6 +615,8 @@ Release 2.5.0 - UNRELEASED
 
     HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
 
+    HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
+
   BUG FIXES 
 
     HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java

@@ -339,7 +339,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
     buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
     buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
     buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
-
+    buffer.append("Xceivers: "+getXceiverCount()+"\n");
     buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
     return buffer.toString();
   }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1456,10 +1456,10 @@ public class BlockManager {
 
   /** Choose target for WebHDFS redirection. */
   public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
-      DatanodeDescriptor clientnode, long blocksize) {
+      DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
     return blockplacement.chooseTarget(src, 1, clientnode,
-        Collections.<DatanodeStorageInfo>emptyList(), false, null, blocksize,
-        storagePolicySuite.getDefaultPolicy());
+        Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
+        blocksize, storagePolicySuite.getDefaultPolicy());
   }
 
   /** Choose target for getting additional datanodes for an existing pipeline. */

+ 5 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -642,15 +642,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
     // check the communication traffic of the target machine
     if (considerLoad) {
-      double avgLoad = 0;
-      if (stats != null) {
-        int size = stats.getNumDatanodesInService();
-        if (size != 0) {
-          avgLoad = (double)stats.getTotalLoad()/size;
-        }
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(storage, "the node is too busy ");
+      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+      final int nodeLoad = node.getXceiverCount();
+      if (nodeLoad > maxLoad) {
+        logNodeIsNotChosen(storage,
+            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
         return false;
       }
     }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -820,7 +820,9 @@ public class DatanodeManager {
   }
 
   /** Start decommissioning the specified datanode. */
-  private void startDecommission(DatanodeDescriptor node) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
         LOG.info("Start Decommissioning " + node + " " + storage

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java

@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
   /** @return the xceiver count */
   public int getXceiverCount();
 
+  /** @return average xceiver count for non-decommission(ing|ed) nodes */
+  public int getInServiceXceiverCount();
+  
+  /** @return number of non-decommission(ing|ed) nodes */
+  public int getNumDatanodesInService();
+  
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics {
     return stats.xceiverCount;
   }
   
+  @Override
+  public synchronized int getInServiceXceiverCount() {
+    return stats.nodesInServiceXceiverCount;
+  }
+  
+  @Override
+  public synchronized int getNumDatanodesInService() {
+    return stats.nodesInService;
+  }
+  
   @Override
   public synchronized long getCacheCapacity() {
     return stats.cacheCapacity;
@@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics {
   }
 
   synchronized void register(final DatanodeDescriptor d) {
-    if (!datanodes.contains(d)) {
+    if (!d.isAlive) {
       addDatanode(d);
 
       //update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics {
   }
 
   synchronized void addDatanode(final DatanodeDescriptor d) {
+    // update in-service node count
+    stats.add(d);
     datanodes.add(d);
     d.isAlive = true;
   }
@@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics {
     private long cacheCapacity = 0L;
     private long cacheUsed = 0L;
 
+    private int nodesInService = 0;
+    private int nodesInServiceXceiverCount = 0;
+
     private int expiredHeartbeats = 0;
 
     private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics {
       blockPoolUsed += node.getBlockPoolUsed();
       xceiverCount += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService++;
+        nodesInServiceXceiverCount += node.getXceiverCount();
         capacityTotal += node.getCapacity();
         capacityRemaining += node.getRemaining();
       } else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics {
       blockPoolUsed -= node.getBlockPoolUsed();
       xceiverCount -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService--;
+        nodesInServiceXceiverCount -= node.getXceiverCount();
         capacityTotal -= node.getCapacity();
         capacityRemaining -= node.getRemaining();
       } else {

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java

@@ -48,6 +48,15 @@ public interface FSClusterStats {
    * @return Number of datanodes that are both alive and not decommissioned.
    */
   public int getNumDatanodesInService();
+
+  /**
+   * an indication of the average load of non-decommission(ing|ed) nodes
+   * eligible for block placement
+   * 
+   * @return average of the in service number of block transfers and block
+   *         writes that are currently occurring on the cluster.
+   */
+  public double getInServiceXceiverAverage();
 }
     
     

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -7323,7 +7323,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   @Override // FSClusterStats
   public int getNumDatanodesInService() {
-    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+    return datanodeStatistics.getNumDatanodesInService();
+  }
+  
+  @Override // for block placement strategy
+  public double getInServiceXceiverAverage() {
+    double avgLoad = 0;
+    final int nodes = getNumDatanodesInService();
+    if (nodes != 0) {
+      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+      avgLoad = (double)xceivers/nodes;
+    }
+    return avgLoad;
   }
 
   public SnapshotManager getSnapshotManager() {

+ 80 - 37
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -27,6 +27,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 
 import javax.servlet.ServletContext;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.DestinationParam;
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.GroupParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -111,11 +113,13 @@ import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
@@ -188,12 +192,26 @@ public class NamenodeWebHdfsMethods {
      }
      return np;
   }
-
+  
   @VisibleForTesting
   static DatanodeInfo chooseDatanode(final NameNode namenode,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize) throws IOException {
+      final long blocksize, final String excludeDatanodes) throws IOException {
     final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    
+    HashSet<Node> excludes = new HashSet<Node>();
+    if (excludeDatanodes != null) {
+      for (String host : StringUtils
+          .getTrimmedStringCollection(excludeDatanodes)) {
+        int idx = host.indexOf(":");
+        if (idx != -1) {          
+          excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+              host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+        } else {
+          excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+        }
+      }
+    }
 
     if (op == PutOpParam.Op.CREATE) {
       //choose a datanode near to client 
@@ -201,7 +219,7 @@ public class NamenodeWebHdfsMethods {
           ).getDatanodeByHost(getRemoteAddress());
       if (clientNode != null) {
         final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
-            path, clientNode, blocksize);
+            path, clientNode, excludes, blocksize);
         if (storages.length > 0) {
           return storages[0].getDatanodeDescriptor();
         }
@@ -228,7 +246,7 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         if (count > 0) {
-          return bestNode(locations.get(0).getLocations());
+          return bestNode(locations.get(0).getLocations(), excludes);
         }
       }
     } 
@@ -242,11 +260,14 @@ public class NamenodeWebHdfsMethods {
    * sorted based on availability and network distances, thus it is sufficient
    * to return the first element of the node here.
    */
-  private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
-    if (nodes.length == 0 || nodes[0].isDecommissioned()) {
-      throw new IOException("No active nodes contain this block");
+  private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+      HashSet<Node> excludes) throws IOException {
+    for (DatanodeInfo dn: nodes) {
+      if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+        return dn;
+      }
     }
-    return nodes[0];
+    throw new IOException("No active nodes contain this block");
   }
 
   private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -265,11 +286,12 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      final long blocksize,
+      final long blocksize, final String excludeDatanodes,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
     final DatanodeInfo dn;
     try {
-      dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+      dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+          excludeDatanodes);
     } catch (InvalidTopologyException ite) {
       throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
     }
@@ -356,13 +378,15 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
           final SnapshotNameParam snapshotName,
       @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
-          final OldSnapshotNameParam oldSnapshotName
-          )throws IOException, InterruptedException {
+          final OldSnapshotNameParam oldSnapshotName,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
+      ) throws IOException, InterruptedException {
     return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
         owner, group, permission, overwrite, bufferSize, replication,
         blockSize, modificationTime, accessTime, renameOptions, createParent,
         delegationTokenArgument, aclPermission, xattrName, xattrValue,
-        xattrSetFlag, snapshotName, oldSnapshotName);
+        xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
   }
 
   /** Handle HTTP PUT request. */
@@ -418,14 +442,16 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
           final SnapshotNameParam snapshotName,
       @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
-          final OldSnapshotNameParam oldSnapshotName
+          final OldSnapshotNameParam oldSnapshotName,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
 
     init(ugi, delegation, username, doAsUser, path, op, destination, owner,
         group, permission, overwrite, bufferSize, replication, blockSize,
         modificationTime, accessTime, renameOptions, delegationTokenArgument,
         aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
-        oldSnapshotName);
+        oldSnapshotName, excludeDatanodes);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -436,7 +462,7 @@ public class NamenodeWebHdfsMethods {
               permission, overwrite, bufferSize, replication, blockSize,
               modificationTime, accessTime, renameOptions, createParent,
               delegationTokenArgument, aclPermission, xattrName, xattrValue,
-              xattrSetFlag, snapshotName, oldSnapshotName);
+              xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
         } finally {
           reset();
         }
@@ -469,7 +495,8 @@ public class NamenodeWebHdfsMethods {
       final XAttrValueParam xattrValue, 
       final XAttrSetFlagParam xattrSetFlag,
       final SnapshotNameParam snapshotName,
-      final OldSnapshotNameParam oldSnapshotName
+      final OldSnapshotNameParam oldSnapshotName,
+      final ExcludeDatanodesParam exclDatanodes
       ) throws IOException, URISyntaxException {
 
     final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -479,9 +506,10 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case CREATE:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, blockSize.getValue(conf),
-          permission, overwrite, bufferSize, replication, blockSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+          exclDatanodes.getValue(), permission, overwrite, bufferSize,
+          replication, blockSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
     case MKDIRS:
@@ -614,9 +642,12 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
           final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
-    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+    return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+        bufferSize, excludeDatanodes);
   }
 
   /** Handle HTTP POST request. */
@@ -638,17 +669,21 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
           final ConcatSourcesParam concatSrcs,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
-          final BufferSizeParam bufferSize
+          final BufferSizeParam bufferSize,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+    init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+        excludeDatanodes);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
         try {
           return post(ugi, delegation, username, doAsUser,
-              path.getAbsolutePath(), op, concatSrcs, bufferSize);
+              path.getAbsolutePath(), op, concatSrcs, bufferSize,
+              excludeDatanodes);
         } finally {
           reset();
         }
@@ -664,15 +699,17 @@ public class NamenodeWebHdfsMethods {
       final String fullpath,
       final PostOpParam op,
       final ConcatSourcesParam concatSrcs,
-      final BufferSizeParam bufferSize
+      final BufferSizeParam bufferSize,
+      final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, URISyntaxException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
 
     switch(op.getValue()) {
     case APPEND:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, -1L, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), -1L, -1L,
+          excludeDatanodes.getValue(), bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case CONCAT:
@@ -710,10 +747,12 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
           final List<XAttrNameParam> xattrNames,
       @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) 
-          final XAttrEncodingParam xattrEncoding
+          final XAttrEncodingParam xattrEncoding,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes          
       ) throws IOException, InterruptedException {
     return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
-        renewer, bufferSize, xattrNames, xattrEncoding);
+        renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes);
   }
 
   /** Handle HTTP GET request. */
@@ -742,11 +781,13 @@ public class NamenodeWebHdfsMethods {
       @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) 
           final List<XAttrNameParam> xattrNames,
       @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) 
-          final XAttrEncodingParam xattrEncoding
+          final XAttrEncodingParam xattrEncoding,
+      @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+          final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, InterruptedException {
 
     init(ugi, delegation, username, doAsUser, path, op, offset, length,
-        renewer, bufferSize, xattrEncoding);
+        renewer, bufferSize, xattrEncoding, excludeDatanodes);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -754,7 +795,7 @@ public class NamenodeWebHdfsMethods {
         try {
           return get(ugi, delegation, username, doAsUser,
               path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
-              xattrNames, xattrEncoding);
+              xattrNames, xattrEncoding, excludeDatanodes);
         } finally {
           reset();
         }
@@ -774,7 +815,8 @@ public class NamenodeWebHdfsMethods {
       final RenewerParam renewer,
       final BufferSizeParam bufferSize,
       final List<XAttrNameParam> xattrNames,
-      final XAttrEncodingParam xattrEncoding
+      final XAttrEncodingParam xattrEncoding,
+      final ExcludeDatanodesParam excludeDatanodes
       ) throws IOException, URISyntaxException {
     final NameNode namenode = (NameNode)context.getAttribute("name.node");
     final NamenodeProtocols np = getRPCServer(namenode);
@@ -782,8 +824,9 @@ public class NamenodeWebHdfsMethods {
     switch(op.getValue()) {
     case OPEN:
     {
-      final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+      final URI uri = redirectURI(namenode, ugi, delegation, username,
+          doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+          excludeDatanodes.getValue(), offset, length, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GET_BLOCK_LOCATIONS:
@@ -819,7 +862,7 @@ public class NamenodeWebHdfsMethods {
     case GETFILECHECKSUM:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, -1L);
+          fullpath, op.getValue(), -1L, -1L, null);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETDELEGATIONTOKEN:

+ 27 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -448,6 +448,7 @@ public class WebHdfsFileSystem extends FileSystem
 
     protected final HttpOpParam.Op op;
     private final boolean redirected;
+    protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
 
     private boolean checkRetry;
 
@@ -499,6 +500,10 @@ public class WebHdfsFileSystem extends FileSystem
      * a DN such as open and checksum
      */
     private HttpURLConnection connect(URL url) throws IOException {
+      //redirect hostname and port
+      String redirectHost = null;
+
+      
       // resolve redirects for a DN operation unless already resolved
       if (op.getRedirect() && !redirected) {
         final HttpOpParam.Op redirectOp =
@@ -511,11 +516,24 @@ public class WebHdfsFileSystem extends FileSystem
         try {
           validateResponse(redirectOp, conn, false);
           url = new URL(conn.getHeaderField("Location"));
+          redirectHost = url.getHost() + ":" + url.getPort();
         } finally {
           conn.disconnect();
         }
       }
-      return connect(op, url);
+      try {
+        return connect(op, url);
+      } catch (IOException ioe) {
+        if (redirectHost != null) {
+          if (excludeDatanodes.getValue() != null) {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+                + excludeDatanodes.getValue());
+          } else {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+          }
+        }
+        throw ioe;
+      }      
     }
 
     private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
@@ -652,7 +670,14 @@ public class WebHdfsFileSystem extends FileSystem
     
     @Override
     protected URL getUrl() throws IOException {
-      return toUrl(op, fspath, parameters);
+      if (excludeDatanodes.getValue() != null) {
+        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+        tmpParam[parameters.length] = excludeDatanodes;
+        return toUrl(op, fspath, tmpParam);
+      } else {
+        return toUrl(op, fspath, parameters);
+      }
     }
   }
 

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+
+/** Exclude datanodes param */
+public class ExcludeDatanodesParam extends StringParam {
+  /** Parameter name. */
+  public static final String NAME = "excludedatanodes";
+  /** Default parameter value. */
+  public static final String DEFAULT = "";
+
+  private static final Domain DOMAIN = new Domain(NAME, null);
+
+  /**
+   * Constructor.
+   * @param str a string representation of the parameter value.
+   */
+  public ExcludeDatanodesParam(final String str) {
+    super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java

@@ -89,7 +89,8 @@ public class TestDelegationTokenForProxyUser {
     builder.append("127.0.1.1,");
     builder.append(InetAddress.getLocalHost().getCanonicalHostName());
     LOG.info("Local Ip addresses: " + builder.toString());
-    conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(superUserShortName),
         builder.toString());
   }
   
@@ -101,7 +102,8 @@ public class TestDelegationTokenForProxyUser {
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
     config.setLong(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
-    config.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER),
+    config.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER),
         "group1");
     config.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

+ 16 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.PathUtils;
@@ -101,6 +102,7 @@ public class TestReplicationPolicyConsiderLoad {
     }
   }
 
+  private final double EPSILON = 0.0001;
   /**
    * Tests that chooseTarget with considerLoad set to true correctly calculates
    * load with decommissioned nodes.
@@ -109,14 +111,6 @@ public class TestReplicationPolicyConsiderLoad {
   public void testChooseTargetWithDecomNodes() throws IOException {
     namenode.getNamesystem().writeLock();
     try {
-      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
-      // returns false
-      for (int i = 0; i < 3; i++) {
-        DatanodeInfo d = dnManager.getDatanodeByXferAddr(
-            dnrList.get(i).getIpAddr(),
-            dnrList.get(i).getXferPort());
-        d.setDecommissioned();
-      }
       String blockPoolId = namenode.getNamesystem().getBlockPoolId();
       dnManager.handleHeartbeat(dnrList.get(3),
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@@ -133,6 +127,20 @@ public class TestReplicationPolicyConsiderLoad {
           blockPoolId, dataNodes[5].getCacheCapacity(),
           dataNodes[5].getCacheRemaining(),
           4, 0, 0);
+      // value in the above heartbeats
+      final int load = 2 + 4 + 4;
+      
+      FSNamesystem fsn = namenode.getNamesystem();
+      assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+      
+      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+      // returns false
+      for (int i = 0; i < 3; i++) {
+        DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+        dnManager.startDecommission(d);
+        d.setDecommissioned();
+      }
+      assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
 
       // Call chooseTarget()
       DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java

@@ -285,8 +285,10 @@ public class TestJspHelper {
     String user = "TheNurse";
     conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     
-    conf.set(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(realUser), "*");
-    conf.set(DefaultImpersonationProvider.getProxySuperuserIpConfKey(realUser), "*");
+    conf.set(DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserGroupConfKey(realUser), "*");
+    conf.set(DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserIpConfKey(realUser), "*");
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
     UserGroupInformation.setConfiguration(conf);
     UserGroupInformation ugi;

+ 185 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

@@ -18,9 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 
 
@@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+  
+  private static final float EPSILON = 0.0001f;
+  @Test
+  public void testXceiverCount() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    // don't waste time retrying if close fails
+    conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
+    MiniDFSCluster cluster = null;
+
+    final int nodes = 8;
+    final int fileCount = 5;
+    final short fileRepl = 3;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
+      cluster.waitActive();
+
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+      List<DataNode> datanodes = cluster.getDataNodes();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+
+      // trigger heartbeats in case not already sent
+      triggerHeartbeats(datanodes);
+      
+      // check that all nodes are live and in service
+      int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
+      int expectedInServiceNodes = nodes;
+      int expectedInServiceLoad = nodes;
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // shutdown half the nodes and force a heartbeat check to ensure
+      // counts are accurate
+      for (int i=0; i < nodes/2; i++) {
+        DataNode dn = datanodes.get(i);
+        DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
+        dn.shutdown();
+        dnd.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        expectedInServiceNodes--;
+        assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      }
+
+      // restart the nodes to verify that counts are correct after
+      // node re-registration 
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      datanodes = cluster.getDataNodes();
+      expectedInServiceNodes = nodes;
+      assertEquals(nodes, datanodes.size());
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // create streams and hsync to force datastreamers to start
+      DFSOutputStream[] streams = new DFSOutputStream[fileCount];
+      for (int i=0; i < fileCount; i++) {
+        streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
+            .getWrappedStream();
+        streams[i].write("1".getBytes());
+        streams[i].hsync();
+        // the load for writers is 2 because both the write xceiver & packet
+        // responder threads are counted in the load
+        expectedTotalLoad += 2*fileRepl;
+        expectedInServiceLoad += 2*fileRepl;
+      }
+      // force nodes to send load update
+      triggerHeartbeats(datanodes);
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes,
+          namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+
+      // decomm a few nodes, substract their load from the expected load,
+      // trigger heartbeat to force load update
+      for (int i=0; i < fileRepl; i++) {
+        expectedInServiceNodes--;
+        DatanodeDescriptor dnd =
+            dnm.getDatanode(datanodes.get(i).getDatanodeId());
+        expectedInServiceLoad -= dnd.getXceiverCount();
+        dnm.startDecommission(dnd);
+        DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+        Thread.sleep(100);
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // check expected load while closing each stream.  recalc expected
+      // load based on whether the nodes in the pipeline are decomm
+      for (int i=0; i < fileCount; i++) {
+        int decomm = 0;
+        for (DatanodeInfo dni : streams[i].getPipeline()) {
+          DatanodeDescriptor dnd = dnm.getDatanode(dni);
+          expectedTotalLoad -= 2;
+          if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
+            decomm++;
+          } else {
+            expectedInServiceLoad -= 2;
+          }
+        }
+        try {
+          streams[i].close();
+        } catch (IOException ioe) {
+          // nodes will go decommissioned even if there's a UC block whose
+          // other locations are decommissioned too.  we'll ignore that
+          // bug for now
+          if (decomm < fileRepl) {
+            throw ioe;
+          }
+        }
+        triggerHeartbeats(datanodes);
+        // verify node count and loads 
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+
+      // shutdown each node, verify node counts based on decomm state
+      for (int i=0; i < nodes; i++) {
+        DataNode dn = datanodes.get(i);
+        dn.shutdown();
+        // force it to appear dead so live count decreases
+        DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
+        dnDesc.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
+        // first few nodes are already out of service
+        if (i >= fileRepl) {
+          expectedInServiceNodes--;
+        }
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+        
+        // live nodes always report load of 1.  no nodes is load 0
+        double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
+        assertEquals((double)expectedXceiverAvg,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // final sanity check
+      assertEquals(0, namesystem.getNumLiveDataNodes());
+      assertEquals(0, namesystem.getNumDatanodesInService());
+      assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
+      assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  private void triggerHeartbeats(List<DataNode> datanodes)
+      throws IOException, InterruptedException {
+    for (DataNode dn : datanodes) {
+      DataNodeTestUtils.triggerHeartbeat(dn);
+    }
+    Thread.sleep(100);
+  }
 }

+ 85 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java

@@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality {
 
           //The chosen datanode must be the same as the client address
           final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-              namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
+              namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
           Assert.assertEquals(ipAddr, chosen.getIpAddr());
         }
       }
@@ -117,23 +117,104 @@ public class TestWebHdfsDataLocality {
 
       { //test GETFILECHECKSUM
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
+            namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
         Assert.assertEquals(expected, chosen);
       }
   
       { //test OPEN
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
+            namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
         Assert.assertEquals(expected, chosen);
       }
 
       { //test APPEND
         final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
-            namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
+            namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
         Assert.assertEquals(expected, chosen);
       }
     } finally {
       cluster.shutdown();
     }
   }
+  
+  @Test
+  public void testExcludeDataNodes() throws Exception {
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
+    final String[] hosts = {"DataNode1", "DataNode2", "DataNode3","DataNode4","DataNode5","DataNode6"};
+    final int nDataNodes = hosts.length;
+    LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks)
+        + ", hosts=" + Arrays.asList(hosts));
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .hosts(hosts).numDataNodes(nDataNodes).racks(racks).build();
+    
+    try {
+      cluster.waitActive();
+
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final NameNode namenode = cluster.getNameNode();
+      final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
+          ).getDatanodeManager();
+      LOG.info("dm=" + dm);
+  
+      final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+      final String f = "/foo";
+      
+      //create a file with three replica.
+      final Path p = new Path(f);
+      final FSDataOutputStream out = dfs.create(p, (short)3);
+      out.write(1);
+      out.close(); 
+      
+      //get replica location.
+      final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
+          namenode, f, 0, 1);
+      final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
+      Assert.assertEquals(1, lb.size());
+      final DatanodeInfo[] locations = lb.get(0).getLocations();
+      Assert.assertEquals(3, locations.length);
+      
+      
+      //For GETFILECHECKSUM, OPEN and APPEND,
+      //the chosen datanode must be different with exclude nodes.
+
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < 2; i++) {
+        sb.append(locations[i].getXferAddr());
+        { // test GETFILECHECKSUM
+          final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+              namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
+              sb.toString());
+          for (int j = 0; j <= i; j++) {
+            Assert.assertNotEquals(locations[j].getHostName(),
+                chosen.getHostName());
+          }
+        }
+
+        { // test OPEN
+          final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+              namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
+          for (int j = 0; j <= i; j++) {
+            Assert.assertNotEquals(locations[j].getHostName(),
+                chosen.getHostName());
+          }
+        }
+  
+        { // test APPEND
+          final DatanodeInfo chosen = NamenodeWebHdfsMethods
+              .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
+                  blocksize, sb.toString());
+          for (int j = 0; j <= i; j++) {
+            Assert.assertNotEquals(locations[j].getHostName(),
+                chosen.getHostName());
+          }
+        }
+        
+        sb.append(",");
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java

@@ -151,8 +151,10 @@ public class TestRefreshUserMappings {
     final String [] GROUP_NAMES2 = new String [] {"gr3" , "gr4"};
     
     //keys in conf
-    String userKeyGroups = DefaultImpersonationProvider.getProxySuperuserGroupConfKey(SUPER_USER);
-    String userKeyHosts = DefaultImpersonationProvider.getProxySuperuserIpConfKey (SUPER_USER);
+    String userKeyGroups = DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserGroupConfKey(SUPER_USER);
+    String userKeyHosts = DefaultImpersonationProvider.getTestProvider().
+        getProxySuperuserIpConfKey (SUPER_USER);
     
     config.set(userKeyGroups, "gr3,gr4,gr5"); // superuser can proxy for this group
     config.set(userKeyHosts,"127.0.0.1");

+ 8 - 0
hadoop-yarn-project/CHANGES.txt

@@ -49,6 +49,11 @@ Release 2.6.0 - UNRELEASED
     YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via 
     junping_du)
 
+    YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong)
+
+    YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
+    via Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -69,6 +74,9 @@ Release 2.6.0 - UNRELEASED
     after RM recovery but before scheduler learns about apps and app-attempts.
     (Jian He via vinodkv)
 
+    YARN-2244. FairScheduler missing handling of containers for unknown 
+    application attempts. (Anubhav Dhoot via kasha)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java

@@ -44,6 +44,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
   public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
 
   private ApplicationAttemptId applicationAttemptId;
+  private int keyId = Integer.MIN_VALUE;
 
   public AMRMTokenIdentifier() {
   }
@@ -53,6 +54,13 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
     this.applicationAttemptId = appAttemptId;
   }
 
+  public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
+      int masterKeyId) {
+    this();
+    this.applicationAttemptId = appAttemptId;
+    this.keyId = masterKeyId;
+  }
+
   @Private
   public ApplicationAttemptId getApplicationAttemptId() {
     return this.applicationAttemptId;
@@ -64,6 +72,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
     out.writeLong(appId.getClusterTimestamp());
     out.writeInt(appId.getId());
     out.writeInt(this.applicationAttemptId.getAttemptId());
+    out.writeInt(this.keyId);
   }
 
   @Override
@@ -75,6 +84,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
         ApplicationId.newInstance(clusterTimeStamp, appId);
     this.applicationAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attemptId);
+    this.keyId = in.readInt();
   }
 
   @Override
@@ -92,6 +102,10 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
         .toString());
   }
 
+  public int getKeyId() {
+    return this.keyId;
+  }
+
   // TODO: Needed?
   @InterfaceAudience.Private
   public static class Renewer extends Token.TrivialRenewer {

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       }
 
       // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
       appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
 
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -123,6 +123,23 @@ public abstract class AbstractYarnScheduler
     return maximumAllocation;
   }
 
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
         applications.get(applicationAttemptId.getApplicationId());

+ 0 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -866,21 +865,6 @@ public class CapacityScheduler extends
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {

+ 0 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -928,22 +928,6 @@ public class FairScheduler extends
     }
   }
 
-  /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   /**
    * Process a heartbeat update from a node.
    */

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java

@@ -65,6 +65,7 @@ public class FairSharePolicy extends SchedulingPolicy {
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends SchedulingPolicy {
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /

+ 0 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -831,23 +830,6 @@ public class FifoScheduler extends
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
   private synchronized void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {

+ 178 - 54
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java

@@ -19,22 +19,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager extends
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
   public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public class AMRMTokenSecretManager extends
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
     }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +215,35 @@ public class AMRMTokenSecretManager extends
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken("Password not found for ApplicationAttempt "
+            + applicationAttemptId);
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Given AMRMToken for application : "
+          + applicationAttemptId.toString()
+          + " seems to have been generated illegally.");
+    } finally {
+      this.readLock.unlock();
     }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
-    }
-    return password;
   }
 
   /**
@@ -167,4 +255,40 @@ public class AMRMTokenSecretManager extends
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

+ 75 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanup();
-    int cleanedConts = contsToClean.size();
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
-      dispatcher.await();
-      contsToClean = resp.getContainersToCleanup();
-      cleanedConts += contsToClean.size();
-    }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
-    Assert.assertEquals(1, cleanedConts);
+    waitForContainerCleanup(dispatcher, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    contsToClean = resp.getContainersToCleanup();
-    cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
+    waitForContainerCleanup(dispatcher, nm1, resp);
+
+    rm.stop();
+  }
+
+  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+      NodeHeartbeatResponse resp) throws Exception {
+    int waitCount = 0, cleanedConts = 0;
+    List<ContainerId> contsToClean;
+    do {
       dispatcher.await();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
+      if (cleanedConts >= 1) {
+        break;
+      }
+      Thread.sleep(100);
+      resp = nm.nodeHeartbeat(true);
+    } while(waitCount++ < 200);
+
+    if (contsToClean.isEmpty()) {
+      LOG.error("Failed to get any containers to cleanup");
+    } else {
+      LOG.info("Got cleanup for " + contsToClean.get(0));
     }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
     Assert.assertEquals(1, cleanedConts);
-
-    rm.stop();
   }
-  
+
   private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
       throws Exception {
     while (true) {
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
     rm2.stop();
   }
 
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+      Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // start new RM
+    final DrainDispatcher dispatcher2 = new DrainDispatcher();
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher2;
+      }
+    };
+    rm2.start();
+
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Add unknown container for application unknown to scheduler
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+        .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+    waitForContainerCleanup(dispatcher2, nm1, response);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -1250,10 +1250,11 @@ public class TestRMRestart {
             .getEncoded());
 
     // assert AMRMTokenSecretManager also knows about the AMRMToken password
-    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
-    Assert.assertArrayEquals(amrmToken.getPassword(),
-      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
-        amrmToken.decodeIdentifier()));
+    // TODO: fix this on YARN-2211
+//    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
+//    Assert.assertArrayEquals(amrmToken.getPassword(),
+//      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
+//        amrmToken.decodeIdentifier()));
     rm1.stop();
     rm2.stop();
   }

+ 8 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
 import javax.crypto.SecretKey;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -175,8 +176,11 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     TestDispatcher dispatcher = new TestDispatcher();
     store.setRMDispatcher(dispatcher);
 
-    AMRMTokenSecretManager appTokenMgr =
-        new AMRMTokenSecretManager(conf);
+    AMRMTokenSecretManager appTokenMgr = spy(
+        new AMRMTokenSecretManager(conf));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
     ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
         new ClientToAMTokenSecretManagerInRM();
 
@@ -455,10 +459,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
   private Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
-    AMRMTokenIdentifier appTokenId =
-        new AMRMTokenIdentifier(attemptId);
     Token<AMRMTokenIdentifier> appToken =
-        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+        appTokenMgr.createAndGetAMRMToken(attemptId);
     appToken.setService(new Text("appToken service"));
     return appToken;
   }

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions {
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+    when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,

+ 68 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

@@ -23,13 +23,12 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
 
-import javax.crypto.SecretKey;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -41,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -65,6 +67,8 @@ public class TestAMRMTokens {
 
   private final Configuration conf;
   private static final int maxWaitAttempts = 50;
+  private static final int rolling_interval_sec = 13;
+  private static final long am_expire_ms = 4000;
 
   @Parameters
   public static Collection<Object[]> configs() {
@@ -201,15 +205,22 @@ public class TestAMRMTokens {
   @Test
   public void testMasterKeyRollOver() throws Exception {
 
+    conf.setLong(
+      YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+      rolling_interval_sec);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
     MyContainerManager containerManager = new MyContainerManager();
     final MockRMWithAMS rm =
         new MockRMWithAMS(conf, containerManager);
     rm.start();
-
+    Long startTime = System.currentTimeMillis();
     final Configuration conf = rm.getConfig();
     final YarnRPC rpc = YarnRPC.create(conf);
     ApplicationMasterProtocol rmClient = null;
-
+    AMRMTokenSecretManager appTokenSecretManager =
+        rm.getRMContext().getAMRMTokenSecretManager();
+    MasterKeyData oldKey = appTokenSecretManager.getMasterKey();
+    Assert.assertNotNull(oldKey);
     try {
       MockNM nm1 = rm.registerNode("localhost:1234", 5120);
 
@@ -218,7 +229,7 @@ public class TestAMRMTokens {
       nm1.nodeHeartbeat(true);
 
       int waitCount = 0;
-      while (containerManager.containerTokens == null && waitCount++ < 20) {
+      while (containerManager.containerTokens == null && waitCount++ < maxWaitAttempts) {
         LOG.info("Waiting for AM Launch to happen..");
         Thread.sleep(1000);
       }
@@ -250,21 +261,65 @@ public class TestAMRMTokens {
       Assert.assertTrue(
           rmClient.allocate(allocateRequest).getAMCommand() == null);
 
-      // Simulate a master-key-roll-over
-      AMRMTokenSecretManager appTokenSecretManager =
-          rm.getRMContext().getAMRMTokenSecretManager();
-      SecretKey oldKey = appTokenSecretManager.getMasterKey();
-      appTokenSecretManager.rollMasterKey();
-      SecretKey newKey = appTokenSecretManager.getMasterKey();
+      // Wait for enough time and make sure the roll_over happens
+      // At mean time, the old AMRMToken should continue to work
+      while(System.currentTimeMillis() - startTime < rolling_interval_sec*1000) {
+        rmClient.allocate(allocateRequest);
+        Thread.sleep(500);
+      }
+
+      MasterKeyData newKey = appTokenSecretManager.getMasterKey();
+      Assert.assertNotNull(newKey);
       Assert.assertFalse("Master key should have changed!",
         oldKey.equals(newKey));
 
+      // Another allocate call with old AMRMToken. Should continue to work.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      rmClient = createRMClient(rm, conf, rpc, currentUser);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      waitCount = 0;
+      while(waitCount++ <= maxWaitAttempts) {
+        if (appTokenSecretManager.getCurrnetMasterKeyData() != oldKey) {
+          break;
+        }
+        try {
+          rmClient.allocate(allocateRequest);
+        } catch (Exception ex) {
+          break;
+        }
+        Thread.sleep(200);
+      }
+      // active the nextMasterKey, and replace the currentMasterKey
+      Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null);
+
+      // Create a new Token
+      Token<AMRMTokenIdentifier> newToken =
+          appTokenSecretManager.createAndGetAMRMToken(applicationAttemptId);
+      SecurityUtil.setTokenService(newToken, rmBindAddress);
+      currentUser.addToken(newToken);
       // Another allocate call. Should continue to work.
       rpc.stopProxy(rmClient, conf); // To avoid using cached client
       rmClient = createRMClient(rm, conf, rpc, currentUser);
       allocateRequest = Records.newRecord(AllocateRequest.class);
-      Assert.assertTrue(
-          rmClient.allocate(allocateRequest).getAMCommand() == null);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      // Should not work by using the old AMRMToken.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      try {
+        currentUser.addToken(amRMToken);
+        rmClient = createRMClient(rm, conf, rpc, currentUser);
+        allocateRequest = Records.newRecord(AllocateRequest.class);
+        Assert
+          .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+        Assert.fail("The old Token should not work");
+      } catch (Exception ex) {
+        // expect exception
+      }
     } finally {
       rm.stop();
       if (rmClient != null) {