Преглед изворни кода

Merge r1609845 through r1612431 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612432 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze пре 11 година
родитељ
комит
3de6c61f86
43 измењених фајлова са 2281 додато и 186 уклоњено
  1. 13 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 35 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
  3. 157 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
  4. 8 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java
  5. 317 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java
  6. 33 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  7. 7 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java
  8. 46 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java
  9. 4 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/LdapGroupsMapping.java
  10. 2 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java
  11. 51 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Timer.java
  12. 47 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  13. 2 0
      hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm
  14. 190 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java
  15. 19 9
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java
  16. 56 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java
  17. 52 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java
  18. 15 0
      hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
  19. 149 0
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java
  20. 96 1
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java
  21. 4 1
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
  22. 20 1
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java
  23. 30 4
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
  24. 83 0
      hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
  25. 100 11
      hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
  26. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  27. 12 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  28. 6 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
  29. 38 24
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
  30. 6 0
      hadoop-mapreduce-project/CHANGES.txt
  31. 195 73
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  32. 24 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
  33. 37 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  34. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
  35. 55 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
  36. 86 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
  37. 5 0
      hadoop-yarn-project/CHANGES.txt
  38. 76 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  39. 80 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/NMDBSchemaVersion.java
  40. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/impl/pb/NMDBSchemaVersionPBImpl.java
  41. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
  42. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
  43. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

+ 13 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -186,6 +186,9 @@ Trunk (Unreleased)
     HADOOP-10750. KMSKeyProviderCache should be in hadoop-common. 
     (asuresh via tucu)
 
+    HADOOP-10720. KMS: Implement generateEncryptedKey and decryptEncryptedKey
+    in the REST API. (asuresh via tucu)
+
   BUG FIXES
 
     HADOOP-9451. Fault single-layer config if node group topology is enabled.
@@ -438,6 +441,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10817. ProxyUsers configuration should support configurable 
     prefixes. (tucu)
 
+    HADOOP-10755. Support negative caching of user-group mapping.
+    (Lei Xu via wang)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -459,6 +465,13 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10591.  Compression codecs must used pooled direct buffers or
     deallocate direct buffers when stream is closed (cmccabe)
 
+    HADOOP-10857.  Native Libraries Guide doen't mention a dependency on
+    openssl-development package (ozawa via cmccabe)
+
+    HADOOP-10866. RawLocalFileSystem fails to read symlink targets via the stat
+    command when the format of the stat command uses non-curly quotes (yzhang
+    via cmccabe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 35 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java

@@ -27,17 +27,19 @@ import javax.crypto.spec.IvParameterSpec;
 import javax.crypto.spec.SecretKeySpec;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * A KeyProvider with Cytographic Extensions specifically for generating
  * Encrypted Keys as well as decrypting them
  * 
  */
+@InterfaceAudience.Private
 public class KeyProviderCryptoExtension extends
     KeyProviderExtension<KeyProviderCryptoExtension.CryptoExtension> {
 
-  protected static final String EEK = "EEK";
-  protected static final String EK = "EK";
+  public static final String EEK = "EEK";
+  public static final String EK = "EK";
 
   /**
    * This is a holder class whose instance contains the keyVersionName, iv
@@ -81,6 +83,14 @@ public class KeyProviderCryptoExtension extends
    */
   public interface CryptoExtension extends KeyProviderExtension.Extension {
 
+    /**
+     * Calls to this method allows the underlying KeyProvider to warm-up any
+     * implementation specific caches used to store the Encrypted Keys.
+     * @param keyNames Array of Key Names
+     */
+    public void warmUpEncryptedKeys(String... keyNames)
+        throws IOException;
+
     /**
      * Generates a key material and encrypts it using the given key version name
      * and initialization vector. The generated key material is of the same
@@ -180,13 +190,35 @@ public class KeyProviderCryptoExtension extends
       return new KeyVersion(keyVer.getName(), EK, ek);
     }
 
+    @Override
+    public void warmUpEncryptedKeys(String... keyNames)
+        throws IOException {
+      // NO-OP since the default version does not cache any keys
+    }
+
   }
 
-  private KeyProviderCryptoExtension(KeyProvider keyProvider,
+  /**
+   * This constructor is to be used by sub classes that provide
+   * delegating/proxying functionality to the {@link KeyProviderCryptoExtension}
+   * @param keyProvider
+   * @param extension
+   */
+  protected KeyProviderCryptoExtension(KeyProvider keyProvider,
       CryptoExtension extension) {
     super(keyProvider, extension);
   }
 
+  /**
+   * Notifies the Underlying CryptoExtension implementation to warm up any
+   * implementation specific caches for the specified KeyVersions
+   * @param keyNames Arrays of key Names
+   */
+  public void warmUpEncryptedKeys(String... keyNames)
+      throws IOException {
+    getExtension().warmUpEncryptedKeys(keyNames);
+  }
+
   /**
    * Generates a key material and encrypts it using the given key version name
    * and initialization vector. The generated key material is of the same

+ 157 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java

@@ -21,7 +21,9 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
@@ -33,6 +35,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import javax.net.ssl.HttpsURLConnection;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,6 +43,7 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.lang.reflect.Constructor;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -50,14 +54,22 @@ import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
+
+import com.google.common.base.Preconditions;
 
 /**
  * KMS client <code>KeyProvider</code> implementation.
  */
 @InterfaceAudience.Private
-public class KMSClientProvider extends KeyProvider {
+public class KMSClientProvider extends KeyProvider implements CryptoExtension {
 
   public static final String SCHEME_NAME = "kms";
 
@@ -78,6 +90,73 @@ public class KMSClientProvider extends KeyProvider {
   public static final String TIMEOUT_ATTR = CONFIG_PREFIX + "timeout";
   public static final int DEFAULT_TIMEOUT = 60;
 
+  private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
+
+  private class EncryptedQueueRefiller implements
+    ValueQueue.QueueRefiller<EncryptedKeyVersion> {
+
+    @Override
+    public void fillQueueForKey(String keyName,
+        Queue<EncryptedKeyVersion> keyQueue, int numEKVs) throws IOException {
+      checkNotNull(keyName, "keyName");
+      Map<String, String> params = new HashMap<String, String>();
+      params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_GENERATE);
+      params.put(KMSRESTConstants.EEK_NUM_KEYS, "" + numEKVs);
+      URL url = createURL(KMSRESTConstants.KEY_RESOURCE, keyName,
+          KMSRESTConstants.EEK_SUB_RESOURCE, params);
+      HttpURLConnection conn = createConnection(url, HTTP_GET);
+      conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);
+      List response = call(conn, null,
+          HttpURLConnection.HTTP_OK, List.class);
+      List<EncryptedKeyVersion> ekvs =
+          parseJSONEncKeyVersion(keyName, response);
+      keyQueue.addAll(ekvs);
+    }
+  }
+
+  public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
+    public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
+        byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
+      super(keyName, keyVersionName, iv, new KMSKeyVersion(null, 
+          encryptedVersionName, keyMaterial));
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static List<EncryptedKeyVersion>
+      parseJSONEncKeyVersion(String keyName, List valueList) {
+    List<EncryptedKeyVersion> ekvs = new LinkedList<EncryptedKeyVersion>();
+    if (!valueList.isEmpty()) {
+      for (Object values : valueList) {
+        Map valueMap = (Map) values;
+
+        String versionName = checkNotNull(
+                (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
+                KMSRESTConstants.VERSION_NAME_FIELD);
+
+        byte[] iv = Base64.decodeBase64(checkNotNull(
+                (String) valueMap.get(KMSRESTConstants.IV_FIELD),
+                KMSRESTConstants.IV_FIELD));
+
+        Map encValueMap = checkNotNull((Map)
+                valueMap.get(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD),
+                KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD);
+
+        String encVersionName = checkNotNull((String)
+                encValueMap.get(KMSRESTConstants.VERSION_NAME_FIELD),
+                KMSRESTConstants.VERSION_NAME_FIELD);
+
+        byte[] encKeyMaterial = Base64.decodeBase64(checkNotNull((String)
+                encValueMap.get(KMSRESTConstants.MATERIAL_FIELD),
+                KMSRESTConstants.MATERIAL_FIELD));
+
+        ekvs.add(new KMSEncryptedKeyVersion(keyName, versionName, iv,
+            encVersionName, encKeyMaterial));
+      }
+    }
+    return ekvs;
+  }
+
   private static KeyVersion parseJSONKeyVersion(Map valueMap) {
     KeyVersion keyVersion = null;
     if (!valueMap.isEmpty()) {
@@ -208,6 +287,28 @@ public class KMSClientProvider extends KeyProvider {
     }
     int timeout = conf.getInt(TIMEOUT_ATTR, DEFAULT_TIMEOUT);
     configurator = new TimeoutConnConfigurator(timeout, sslFactory);
+    encKeyVersionQueue =
+        new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
+            conf.getInt(
+                CommonConfigurationKeysPublic.KMS_CLIENT_ENC_KEY_CACHE_SIZE,
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_SIZE_DEFAULT),
+            conf.getFloat(
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK,
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK_DEFAULT),
+            conf.getInt(
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_MS,
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_DEFAULT),
+            conf.getInt(
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS,
+                CommonConfigurationKeysPublic.
+                    KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
+            new EncryptedQueueRefiller());
   }
 
   private String createServiceURL(URL url) throws IOException {
@@ -527,6 +628,51 @@ public class KMSClientProvider extends KeyProvider {
     }
   }
 
+  @Override
+  public EncryptedKeyVersion generateEncryptedKey(
+      String encryptionKeyName) throws IOException, GeneralSecurityException {
+    try {
+      return encKeyVersionQueue.getNext(encryptionKeyName);
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof SocketTimeoutException) {
+        throw (SocketTimeoutException)e.getCause();
+      }
+      throw new IOException(e);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public KeyVersion decryptEncryptedKey(
+      EncryptedKeyVersion encryptedKeyVersion) throws IOException,
+                                                      GeneralSecurityException {
+    checkNotNull(encryptedKeyVersion.getKeyVersionName(), "versionName");
+    checkNotNull(encryptedKeyVersion.getIv(), "iv");
+    Preconditions.checkArgument(encryptedKeyVersion.getEncryptedKey()
+        .getVersionName().equals(KeyProviderCryptoExtension.EEK),
+        "encryptedKey version name must be '%s', is '%s'",
+        KeyProviderCryptoExtension.EK, encryptedKeyVersion.getEncryptedKey()
+            .getVersionName());
+    checkNotNull(encryptedKeyVersion.getEncryptedKey(), "encryptedKey");
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT);
+    Map<String, Object> jsonPayload = new HashMap<String, Object>();
+    jsonPayload.put(KMSRESTConstants.NAME_FIELD,
+        encryptedKeyVersion.getKeyName());
+    jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String(
+        encryptedKeyVersion.getIv()));
+    jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String(
+            encryptedKeyVersion.getEncryptedKey().getMaterial()));
+    URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE,
+        encryptedKeyVersion.getKeyVersionName(),
+        KMSRESTConstants.EEK_SUB_RESOURCE, params);
+    HttpURLConnection conn = createConnection(url, HTTP_POST);
+    conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);
+    Map response =
+        call(conn, jsonPayload, HttpURLConnection.HTTP_OK, Map.class);
+    return parseJSONKeyVersion(response);
+  }
+
   @Override
   public List<KeyVersion> getKeyVersions(String name) throws IOException {
     checkNotEmpty(name, "name");
@@ -570,4 +716,14 @@ public class KMSClientProvider extends KeyProvider {
     // the server should not keep in memory state on behalf of clients either.
   }
 
+  @Override
+  public void warmUpEncryptedKeys(String... keyNames)
+      throws IOException {
+    try {
+      encKeyVersionQueue.initializeQueuesForKeys(keyNames);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
 }

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java

@@ -34,10 +34,16 @@ public class KMSRESTConstants {
   public static final String KEY_VERSION_RESOURCE = "keyversion";
   public static final String METADATA_SUB_RESOURCE = "_metadata";
   public static final String VERSIONS_SUB_RESOURCE = "_versions";
+  public static final String EEK_SUB_RESOURCE = "_eek";
   public static final String CURRENT_VERSION_SUB_RESOURCE = "_currentversion";
 
   public static final String KEY_OP = "key";
+  public static final String EEK_OP = "eek_op";
+  public static final String EEK_GENERATE = "generate";
+  public static final String EEK_DECRYPT = "decrypt";
+  public static final String EEK_NUM_KEYS = "num_keys";
 
+  public static final String IV_FIELD = "iv";
   public static final String NAME_FIELD = "name";
   public static final String CIPHER_FIELD = "cipher";
   public static final String LENGTH_FIELD = "length";
@@ -47,6 +53,8 @@ public class KMSRESTConstants {
   public static final String VERSIONS_FIELD = "versions";
   public static final String MATERIAL_FIELD = "material";
   public static final String VERSION_NAME_FIELD = "versionName";
+  public static final String ENCRYPTED_KEY_VERSION_FIELD =
+      "encryptedKeyVersion";
 
   public static final String ERROR_EXCEPTION_JSON = "exception";
   public static final String ERROR_MESSAGE_JSON = "message";

+ 317 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java

@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A Utility class that maintains a Queue of entries for a given key. It tries
+ * to ensure that there is are always at-least <code>numValues</code> entries
+ * available for the client to consume for a particular key.
+ * It also uses an underlying Cache to evict queues for keys that have not been
+ * accessed for a configurable period of time.
+ * Implementing classes are required to implement the
+ * <code>QueueRefiller</code> interface that exposes a method to refill the
+ * queue, when empty
+ */
+@InterfaceAudience.Private
+public class ValueQueue <E> {
+
+  /**
+   * QueueRefiller interface a client must implement to use this class
+   */
+  public interface QueueRefiller <E> {
+    /**
+     * Method that has to be implemented by implementing classes to fill the
+     * Queue.
+     * @param keyName Key name
+     * @param keyQueue Queue that needs to be filled
+     * @param numValues number of Values to be added to the queue.
+     * @throws IOException
+     */
+    public void fillQueueForKey(String keyName,
+        Queue<E> keyQueue, int numValues) throws IOException;
+  }
+
+  private static final String REFILL_THREAD =
+      ValueQueue.class.getName() + "_thread";
+
+  private final LoadingCache<String, LinkedBlockingQueue<E>> keyQueues;
+  private final ThreadPoolExecutor executor;
+  private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue();
+  private final QueueRefiller<E> refiller;
+  private final SyncGenerationPolicy policy;
+
+  private final int numValues;
+  private final float lowWatermark;
+
+  /**
+   * A <code>Runnable</code> which takes a string name.
+   */
+  private abstract static class NamedRunnable implements Runnable {
+    final String name;
+    private NamedRunnable(String keyName) {
+      this.name = keyName;
+    }
+  }
+
+  /**
+   * This backing blocking queue used in conjunction with the
+   * <code>ThreadPoolExecutor</code> used by the <code>ValueQueue</code>. This
+   * Queue accepts a task only if the task is not currently in the process
+   * of being run by a thread which is implied by the presence of the key
+   * in the <code>keysInProgress</code> set.
+   *
+   * NOTE: Only methods that ware explicitly called by the
+   * <code>ThreadPoolExecutor</code> need to be over-ridden.
+   */
+  private static class UniqueKeyBlockingQueue extends
+      LinkedBlockingQueue<Runnable> {
+
+    private static final long serialVersionUID = -2152747693695890371L;
+    private HashSet<String> keysInProgress = new HashSet<String>();
+
+    @Override
+    public synchronized void put(Runnable e) throws InterruptedException {
+      if (keysInProgress.add(((NamedRunnable)e).name)) {
+        super.put(e);
+      }
+    }
+
+    @Override
+    public Runnable take() throws InterruptedException {
+      Runnable k = super.take();
+      if (k != null) {
+        keysInProgress.remove(((NamedRunnable)k).name);
+      }
+      return k;
+    }
+
+    @Override
+    public Runnable poll(long timeout, TimeUnit unit)
+        throws InterruptedException {
+      Runnable k = super.poll(timeout, unit);
+      if (k != null) {
+        keysInProgress.remove(((NamedRunnable)k).name);
+      }
+      return k;
+    }
+
+  }
+
+  /**
+   * Policy to decide how many values to return to client when client asks for
+   * "n" values and Queue is empty.
+   * This decides how many values to return when client calls "getAtMost"
+   */
+  public static enum SyncGenerationPolicy {
+    ATLEAST_ONE, // Return atleast 1 value
+    LOW_WATERMARK, // Return min(n, lowWatermark * numValues) values
+    ALL // Return n values
+  }
+
+  /**
+   * Constructor takes the following tunable configuration parameters
+   * @param numValues The number of values cached in the Queue for a
+   *    particular key.
+   * @param lowWatermark The ratio of (number of current entries/numValues)
+   *    below which the <code>fillQueueForKey()</code> funciton will be
+   *    invoked to fill the Queue.
+   * @param expiry Expiry time after which the Key and associated Queue are
+   *    evicted from the cache.
+   * @param numFillerThreads Number of threads to use for the filler thread
+   * @param policy The SyncGenerationPolicy to use when client
+   *    calls "getAtMost"
+   * @param refiller implementation of the QueueRefiller
+   */
+  public ValueQueue(final int numValues, final float lowWatermark,
+      long expiry, int numFillerThreads, SyncGenerationPolicy policy,
+      final QueueRefiller<E> refiller) {
+    Preconditions.checkArgument(numValues > 0, "\"numValues\" must be > 0");
+    Preconditions.checkArgument(((lowWatermark > 0)&&(lowWatermark <= 1)),
+        "\"lowWatermark\" must be > 0 and <= 1");
+    Preconditions.checkArgument(expiry > 0, "\"expiry\" must be > 0");
+    Preconditions.checkArgument(numFillerThreads > 0,
+        "\"numFillerThreads\" must be > 0");
+    Preconditions.checkNotNull(policy, "\"policy\" must not be null");
+    this.refiller = refiller;
+    this.policy = policy;
+    this.numValues = numValues;
+    this.lowWatermark = lowWatermark;
+    keyQueues = CacheBuilder.newBuilder()
+            .expireAfterAccess(expiry, TimeUnit.MILLISECONDS)
+            .build(new CacheLoader<String, LinkedBlockingQueue<E>>() {
+                  @Override
+                  public LinkedBlockingQueue<E> load(String keyName)
+                      throws Exception {
+                    LinkedBlockingQueue<E> keyQueue =
+                        new LinkedBlockingQueue<E>();
+                    refiller.fillQueueForKey(keyName, keyQueue,
+                        (int)(lowWatermark * numValues));
+                    return keyQueue;
+                  }
+                });
+
+    executor =
+        new ThreadPoolExecutor(numFillerThreads, numFillerThreads, 0L,
+            TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(REFILL_THREAD).build());
+    // To ensure all requests are first queued, make coreThreads = maxThreads
+    // and pre-start all the Core Threads.
+    executor.prestartAllCoreThreads();
+  }
+
+  public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
+      int numFillerThreads, QueueRefiller<E> fetcher) {
+    this(numValues, lowWaterMark, expiry, numFillerThreads,
+        SyncGenerationPolicy.ALL, fetcher);
+  }
+
+  /**
+   * Initializes the Value Queues for the provided keys by calling the
+   * fill Method with "numInitValues" values
+   * @param keyNames Array of key Names
+   * @throws ExecutionException
+   */
+  public void initializeQueuesForKeys(String... keyNames)
+      throws ExecutionException {
+    for (String keyName : keyNames) {
+      keyQueues.get(keyName);
+    }
+  }
+
+  /**
+   * This removes the value currently at the head of the Queue for the
+   * provided key. Will immediately fire the Queue filler function if key
+   * does not exist.
+   * If Queue exists but all values are drained, It will ask the generator
+   * function to add 1 value to Queue and then drain it.
+   * @param keyName String key name
+   * @return E the next value in the Queue
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public E getNext(String keyName)
+      throws IOException, ExecutionException {
+    return getAtMost(keyName, 1).get(0);
+  }
+
+  /**
+   * This removes the "num" values currently at the head of the Queue for the
+   * provided key. Will immediately fire the Queue filler function if key
+   * does not exist
+   * How many values are actually returned is governed by the
+   * <code>SyncGenerationPolicy</code> specified by the user.
+   * @param keyName String key name
+   * @param num Minimum number of values to return.
+   * @return List<E> values returned
+   * @throws IOException
+   * @throws ExecutionException
+   */
+  public List<E> getAtMost(String keyName, int num) throws IOException,
+      ExecutionException {
+    LinkedBlockingQueue<E> keyQueue = keyQueues.get(keyName);
+    // Using poll to avoid race condition..
+    LinkedList<E> ekvs = new LinkedList<E>();
+    try {
+      for (int i = 0; i < num; i++) {
+        E val = keyQueue.poll();
+        // If queue is empty now, Based on the provided SyncGenerationPolicy,
+        // figure out how many new values need to be generated synchronously
+        if (val == null) {
+          // Synchronous call to get remaining values
+          int numToFill = 0;
+          switch (policy) {
+          case ATLEAST_ONE:
+            numToFill = (ekvs.size() < 1) ? 1 : 0;
+            break;
+          case LOW_WATERMARK:
+            numToFill =
+                Math.min(num, (int) (lowWatermark * numValues)) - ekvs.size();
+            break;
+          case ALL:
+            numToFill = num - ekvs.size();
+            break;
+          }
+          // Synchronous fill if not enough values found
+          if (numToFill > 0) {
+            refiller.fillQueueForKey(keyName, ekvs, numToFill);
+          }
+          // Asynch task to fill > lowWatermark
+          if (i <= (int) (lowWatermark * numValues)) {
+            submitRefillTask(keyName, keyQueue);
+          }
+          return ekvs;
+        }
+        ekvs.add(val);
+      }
+    } catch (Exception e) {
+      throw new IOException("Exeption while contacting value generator ", e);
+    }
+    return ekvs;
+  }
+
+  private void submitRefillTask(final String keyName,
+      final Queue<E> keyQueue) throws InterruptedException {
+    // The submit/execute method of the ThreadPoolExecutor is bypassed and
+    // the Runnable is directly put in the backing BlockingQueue so that we
+    // can control exactly how the runnable is inserted into the queue.
+    queue.put(
+        new NamedRunnable(keyName) {
+          @Override
+          public void run() {
+            int cacheSize = numValues;
+            int threshold = (int) (lowWatermark * (float) cacheSize);
+            // Need to ensure that only one refill task per key is executed
+            try {
+              if (keyQueue.size() < threshold) {
+                refiller.fillQueueForKey(name, keyQueue,
+                    cacheSize - keyQueue.size());
+              }
+            } catch (final Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+        );
+  }
+
+  /**
+   * Cleanly shutdown
+   */
+  public void shutdown() {
+    executor.shutdownNow();
+  }
+
+}

+ 33 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -250,6 +250,12 @@ public class CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT =
     300;
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String  HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS =
+    "hadoop.security.groups.negative-cache.secs";
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final long HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT =
+    30;
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS =
     "hadoop.security.groups.cache.warn.after.ms";
   public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT =
@@ -285,5 +291,32 @@ public class CommonConfigurationKeysPublic {
   /** Class to override Impersonation provider */
   public static final String  HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS =
     "hadoop.security.impersonation.provider.class";
+
+  //  <!--- KMSClientProvider configurations —>
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String KMS_CLIENT_ENC_KEY_CACHE_SIZE =
+      "hadoop.security.kms.client.encrypted.key.cache.size";
+  /** Default value for KMS_CLIENT_ENC_KEY_CACHE_SIZE */
+  public static final int KMS_CLIENT_ENC_KEY_CACHE_SIZE_DEFAULT = 500;
+
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK =
+      "hadoop.security.kms.client.encrypted.key.cache.low-watermark";
+  /** Default value for KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK */
+  public static final float KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK_DEFAULT =
+      0.3f;
+
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS =
+      "hadoop.security.kms.client.encrypted.key.cache.num.refill.threads";
+  /** Default value for KMS_CLIENT_ENC_KEY_NUM_REFILL_THREADS */
+  public static final int KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT =
+      2;
+
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_MS =
+      "hadoop.security.kms.client.encrypted.key.cache.expiry";
+  /** Default value for KMS_CLIENT_ENC_KEY_CACHE_EXPIRY (12 hrs)*/
+  public static final int KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_DEFAULT = 43200000;
 }
 

+ 7 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java

@@ -128,6 +128,8 @@ public class Stat extends Shell {
           " link " + original);
     }
     // 6,symbolic link,6,1373584236,1373584236,lrwxrwxrwx,andrew,andrew,`link' -> `target'
+    // OR
+    // 6,symbolic link,6,1373584236,1373584236,lrwxrwxrwx,andrew,andrew,'link' -> 'target'
     StringTokenizer tokens = new StringTokenizer(line, ",");
     try {
       long length = Long.parseLong(tokens.nextToken());
@@ -147,18 +149,17 @@ public class Stat extends Shell {
       String group = tokens.nextToken();
       String symStr = tokens.nextToken();
       // 'notalink'
-      // 'link' -> `target'
+      // `link' -> `target' OR 'link' -> 'target'
       // '' -> ''
       Path symlink = null;
-      StringTokenizer symTokens = new StringTokenizer(symStr, "`");
-      symTokens.nextToken();
+      String parts[] = symStr.split(" -> ");      
       try {
-        String target = symTokens.nextToken();
-        target = target.substring(0, target.length()-1);
+        String target = parts[1];
+        target = target.substring(1, target.length()-1);
         if (!target.isEmpty()) {
           symlink = new Path(target);
         }
-      } catch (NoSuchElementException e) {
+      } catch (ArrayIndexOutOfBoundsException e) {
         // null if not a symlink
       }
       // Set stat

+ 46 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,24 +58,35 @@ public class Groups {
   private final Map<String, List<String>> staticUserToGroupsMap = 
       new HashMap<String, List<String>>();
   private final long cacheTimeout;
+  private final long negativeCacheTimeout;
   private final long warningDeltaMs;
+  private final Timer timer;
 
   public Groups(Configuration conf) {
+    this(conf, new Timer());
+  }
+
+  public Groups(Configuration conf, Timer timer) {
     impl = 
       ReflectionUtils.newInstance(
           conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, 
                         ShellBasedUnixGroupsMapping.class, 
                         GroupMappingServiceProvider.class), 
           conf);
-    
+
     cacheTimeout = 
       conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 
           CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
+    negativeCacheTimeout =
+      conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS,
+          CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT) * 1000;
     warningDeltaMs =
       conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
         CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
     parseStaticMapping(conf);
 
+    this.timer = timer;
+
     if(LOG.isDebugEnabled())
       LOG.debug("Group mapping impl=" + impl.getClass().getName() + 
           "; cacheTimeout=" + cacheTimeout + "; warningDeltaMs=" +
@@ -111,7 +122,29 @@ public class Groups {
       staticUserToGroupsMap.put(user, groups);
     }
   }
+
+  /**
+   * Determine whether the CachedGroups is expired.
+   * @param groups cached groups for one user.
+   * @return true if groups is expired from useToGroupsMap.
+   */
+  private boolean hasExpired(CachedGroups groups, long startMs) {
+    if (groups == null) {
+      return true;
+    }
+    long timeout = cacheTimeout;
+    if (isNegativeCacheEnabled() && groups.getGroups().isEmpty()) {
+      // This CachedGroups is in the negative cache, thus it should expire
+      // sooner.
+      timeout = negativeCacheTimeout;
+    }
+    return groups.getTimestamp() + timeout <= startMs;
+  }
   
+  private boolean isNegativeCacheEnabled() {
+    return negativeCacheTimeout > 0;
+  }
+
   /**
    * Get the group memberships of a given user.
    * @param user User's name
@@ -126,18 +159,22 @@ public class Groups {
     }
     // Return cached value if available
     CachedGroups groups = userToGroupsMap.get(user);
-    long startMs = Time.monotonicNow();
-    // if cache has a value and it hasn't expired
-    if (groups != null && (groups.getTimestamp() + cacheTimeout > startMs)) {
+    long startMs = timer.monotonicNow();
+    if (!hasExpired(groups, startMs)) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Returning cached groups for '" + user + "'");
       }
+      if (groups.getGroups().isEmpty()) {
+        // Even with enabling negative cache, getGroups() has the same behavior
+        // that throws IOException if the groups for the user is empty.
+        throw new IOException("No groups found for user " + user);
+      }
       return groups.getGroups();
     }
 
     // Create and cache user's groups
     List<String> groupList = impl.getGroups(user);
-    long endMs = Time.monotonicNow();
+    long endMs = timer.monotonicNow();
     long deltaMs = endMs - startMs ;
     UserGroupInformation.metrics.addGetGroups(deltaMs);
     if (deltaMs > warningDeltaMs) {
@@ -146,6 +183,9 @@ public class Groups {
     }
     groups = new CachedGroups(groupList, endMs);
     if (groups.getGroups().isEmpty()) {
+      if (isNegativeCacheEnabled()) {
+        userToGroupsMap.put(user, groups);
+      }
       throw new IOException("No groups found for user " + user);
     }
     userToGroupsMap.put(user, groups);

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/LdapGroupsMapping.java

@@ -201,7 +201,8 @@ public class LdapGroupsMapping
     } catch (CommunicationException e) {
       LOG.warn("Connection is closed, will try to reconnect");
     } catch (NamingException e) {
-      LOG.warn("Exception trying to get groups for user " + user, e);
+      LOG.warn("Exception trying to get groups for user " + user + ": "
+          + e.getMessage());
       return emptyResults;
     }
 
@@ -215,7 +216,8 @@ public class LdapGroupsMapping
       } catch (CommunicationException e) {
         LOG.warn("Connection being closed, reconnecting failed, retryCount = " + retryCount);
       } catch (NamingException e) {
-        LOG.warn("Exception trying to get groups for user " + user, e);
+        LOG.warn("Exception trying to get groups for user " + user + ":"
+            + e.getMessage());
         return emptyResults;
       }
     }

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java

@@ -84,7 +84,8 @@ public class ShellBasedUnixGroupsMapping
       result = Shell.execCommand(Shell.getGroupsForUserCommand(user));
     } catch (ExitCodeException e) {
       // if we didn't get the group - just return empty list;
-      LOG.warn("got exception trying to get groups for user " + user, e);
+      LOG.warn("got exception trying to get groups for user " + user + ": "
+          + e.getMessage());
       return new LinkedList<String>();
     }
     

+ 51 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Timer.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Utility methods for getting the time and computing intervals.
+ *
+ * It has the same behavior as {{@link Time}}, with the exception that its
+ * functions can be overridden for dependency injection purposes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Timer {
+  /**
+   * Current system time.  Do not use this to calculate a duration or interval
+   * to sleep, because it will be broken by settimeofday.  Instead, use
+   * monotonicNow.
+   * @return current time in msec.
+   */
+  public long now() {
+    return Time.now();
+  }
+
+  /**
+   * Current time from some arbitrary time base in the past, counting in
+   * milliseconds, and not affected by settimeofday or similar system clock
+   * changes.  This is appropriate to use when computing how much longer to
+   * wait for an interval to expire.
+   * @return a monotonic clock that counts in milliseconds.
+   */
+  public long monotonicNow() { return Time.monotonicNow(); }
+}

+ 47 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -197,6 +197,20 @@ for ldap providers in the same way as above does.
   </description>
 </property>
 
+<property>
+  <name>hadoop.security.groups.negative-cache.secs</name>
+  <value>30</value>
+  <description>
+    Expiration time for entries in the the negative user-to-group mapping
+    caching, in seconds. This is useful when invalid users are retrying
+    frequently. It is suggested to set a small value for this expiration, since
+    a transient error in group lookup could temporarily lock out a legitimate
+    user.
+
+    Set this to zero or negative value to disable negative user-to-group caching.
+  </description>
+</property>
+
 <property>
   <name>hadoop.security.groups.cache.warn.after.ms</name>
   <value>5000</value>
@@ -1455,4 +1469,37 @@ for ldap providers in the same way as above does.
   <value>true</value>
   <description>Don't cache 'har' filesystem instances.</description>
 </property>
+
+<!--- KMSClientProvider configurations -->
+<property>
+  <name>hadoop.security.kms.client.encrypted.key.cache.size</name>
+  <value>500</value>
+  <description>
+    Size of the EncryptedKeyVersion cache Queue for each key
+  </description>
+</property>
+<property>
+  <name>hadoop.security.kms.client.encrypted.key.cache.low-watermark</name>
+  <value>0.3f</value>
+  <description>
+    If size of the EncryptedKeyVersion cache Queue falls below the
+    low watermark, this cache queue will be scheduled for a refill
+  </description>
+</property>
+<property>
+  <name>hadoop.security.kms.client.encrypted.key.cache.num.refill.threads</name>
+  <value>2</value>
+  <description>
+    Number of threads to use for refilling depleted EncryptedKeyVersion
+    cache Queues
+  </description>
+</property>
+<property>
+  <name>"hadoop.security.kms.client.encrypted.key.cache.expiry</name>
+  <value>43200000</value>
+  <description>
+    Cache expiry time for a Key, after which the cache Queue for this
+    key will be dropped. Default = 12hrs
+  </description>
+</property>
 </configuration>

+ 2 - 0
hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm

@@ -116,6 +116,8 @@ Native Libraries Guide
 
      * zlib-development package (stable version >= 1.2.0)
 
+     * openssl-development package(e.g. libssl-dev)
+
    Once you installed the prerequisite packages use the standard hadoop
    pom.xml file and pass along the native flag to build the native hadoop 
    library:

+ 190 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java

@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.crypto.key.kms.ValueQueue;
+import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller;
+import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestValueQueue {
+
+  private static class FillInfo {
+    final int num;
+    final String key;
+    FillInfo(int num, String key) {
+      this.num = num;
+      this.key = key;
+    }
+  }
+
+  private static class MockFiller implements QueueRefiller<String> {
+    final LinkedBlockingQueue<FillInfo> fillCalls =
+        new LinkedBlockingQueue<FillInfo>();
+    @Override
+    public void fillQueueForKey(String keyName, Queue<String> keyQueue,
+        int numValues) throws IOException {
+      fillCalls.add(new FillInfo(numValues, keyName));
+      for(int i = 0; i < numValues; i++) {
+        keyQueue.add("test");
+      }
+    }
+    public FillInfo getTop() throws InterruptedException {
+      return fillCalls.poll(500, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Verifies that Queue is initially filled to "numInitValues"
+   */
+  @Test
+  public void testInitFill() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.1f, 300, 1,
+            SyncGenerationPolicy.ALL, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(1, filler.getTop().num);
+    vq.shutdown();
+  }
+
+  /**
+   * Verifies that Queue is initialized (Warmed-up) for provided keys
+   */
+  @Test
+  public void testWarmUp() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.5f, 300, 1,
+            SyncGenerationPolicy.ALL, filler);
+    vq.initializeQueuesForKeys("k1", "k2", "k3");
+    FillInfo[] fillInfos =
+      {filler.getTop(), filler.getTop(), filler.getTop()};
+    Assert.assertEquals(5, fillInfos[0].num);
+    Assert.assertEquals(5, fillInfos[1].num);
+    Assert.assertEquals(5, fillInfos[2].num);
+    Assert.assertEquals(Sets.newHashSet("k1", "k2", "k3"),
+        Sets.newHashSet(fillInfos[0].key,
+            fillInfos[1].key,
+            fillInfos[2].key));
+    vq.shutdown();
+  }
+
+  /**
+   * Verifies that the refill task is executed after "checkInterval" if
+   * num values below "lowWatermark"
+   */
+  @Test
+  public void testRefill() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.1f, 300, 1,
+            SyncGenerationPolicy.ALL, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(1, filler.getTop().num);
+    // Trigger refill
+    vq.getNext("k1");
+    Assert.assertEquals(1, filler.getTop().num);
+    Assert.assertEquals(10, filler.getTop().num);
+    vq.shutdown();
+  }
+
+  /**
+   * Verifies that the No refill Happens after "checkInterval" if
+   * num values above "lowWatermark"
+   */
+  @Test
+  public void testNoRefill() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.5f, 300, 1,
+            SyncGenerationPolicy.ALL, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(5, filler.getTop().num);
+    Assert.assertEquals(null, filler.getTop());
+    vq.shutdown();
+  }
+
+  /**
+   * Verify getAtMost when SyncGeneration Policy = ALL
+   */
+  @Test
+  public void testgetAtMostPolicyALL() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.1f, 300, 1,
+            SyncGenerationPolicy.ALL, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(1, filler.getTop().num);
+    // Drain completely
+    Assert.assertEquals(10, vq.getAtMost("k1", 10).size());
+    // Synchronous call
+    Assert.assertEquals(10, filler.getTop().num);
+    // Ask for more... return all
+    Assert.assertEquals(19, vq.getAtMost("k1", 19).size());
+    // Synchronous call (No Async call since num > lowWatermark)
+    Assert.assertEquals(19, filler.getTop().num);
+    vq.shutdown();
+  }
+
+  /**
+   * Verify getAtMost when SyncGeneration Policy = ALL
+   */
+  @Test
+  public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.3f, 300, 1,
+            SyncGenerationPolicy.ATLEAST_ONE, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(3, filler.getTop().num);
+    // Drain completely
+    Assert.assertEquals(2, vq.getAtMost("k1", 10).size());
+    // Asynch Refill call
+    Assert.assertEquals(10, filler.getTop().num);
+    vq.shutdown();
+  }
+
+  /**
+   * Verify getAtMost when SyncGeneration Policy = LOW_WATERMARK
+   */
+  @Test
+  public void testgetAtMostPolicyLOW_WATERMARK() throws Exception {
+    MockFiller filler = new MockFiller();
+    ValueQueue<String> vq =
+        new ValueQueue<String>(10, 0.3f, 300, 1,
+            SyncGenerationPolicy.LOW_WATERMARK, filler);
+    Assert.assertEquals("test", vq.getNext("k1"));
+    Assert.assertEquals(3, filler.getTop().num);
+    // Drain completely
+    Assert.assertEquals(3, vq.getAtMost("k1", 10).size());
+    // Synchronous call
+    Assert.assertEquals(1, filler.getTop().num);
+    // Asynch Refill call
+    Assert.assertEquals(10, filler.getTop().num);
+    vq.shutdown();
+  }
+}

+ 19 - 9
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java

@@ -45,15 +45,15 @@ public class TestStat extends FileSystemTestHelper {
     final String doesNotExist;
     final String directory;
     final String file;
-    final String symlink;
+    final String[] symlinks;
     final String stickydir;
 
     StatOutput(String doesNotExist, String directory, String file,
-        String symlink, String stickydir) {
+        String[] symlinks, String stickydir) {
       this.doesNotExist = doesNotExist;
       this.directory = directory;
       this.file = file;
-      this.symlink = symlink;
+      this.symlinks = symlinks;
       this.stickydir = stickydir;
     }
 
@@ -78,10 +78,12 @@ public class TestStat extends FileSystemTestHelper {
       status = stat.getFileStatusForTesting();
       assertTrue(status.isFile());
 
-      br = new BufferedReader(new StringReader(symlink));
-      stat.parseExecResult(br);
-      status = stat.getFileStatusForTesting();
-      assertTrue(status.isSymlink());
+      for (String symlink : symlinks) {
+        br = new BufferedReader(new StringReader(symlink));
+        stat.parseExecResult(br);
+        status = stat.getFileStatusForTesting();
+        assertTrue(status.isSymlink());
+      }
 
       br = new BufferedReader(new StringReader(stickydir));
       stat.parseExecResult(br);
@@ -93,22 +95,30 @@ public class TestStat extends FileSystemTestHelper {
 
   @Test(timeout=10000)
   public void testStatLinux() throws Exception {
+    String[] symlinks = new String[] {
+        "6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'",
+        "6,symbolic link,1373584236,1373584236,777,andrew,andrew,'link' -> 'target'"
+    };
     StatOutput linux = new StatOutput(
         "stat: cannot stat `watermelon': No such file or directory",
         "4096,directory,1373584236,1373586485,755,andrew,root,`.'",
         "0,regular empty file,1373584228,1373584228,644,andrew,andrew,`target'",
-        "6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'",
+        symlinks,
         "4096,directory,1374622334,1375124212,1755,andrew,andrew,`stickydir'");
     linux.test();
   }
 
   @Test(timeout=10000)
   public void testStatFreeBSD() throws Exception {
+    String[] symlinks = new String[] {
+        "6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'"
+    };
+    
     StatOutput freebsd = new StatOutput(
         "stat: symtest/link: stat: No such file or directory",
         "512,Directory,1373583695,1373583669,40755,awang,awang,`link' -> `'",
         "0,Regular File,1373508937,1373508937,100644,awang,awang,`link' -> `'",
-        "6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'",
+        symlinks,
         "512,Directory,1375139537,1375139537,41755,awang,awang,`link' -> `'");
     freebsd.test();
   }

+ 56 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java

@@ -26,8 +26,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.FakeTimer;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -94,6 +97,9 @@ public class TestGroupsCaching {
 
   @Test
   public void testGroupsCaching() throws Exception {
+    // Disable negative cache.
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
     Groups groups = new Groups(conf);
     groups.cacheGroupsAdd(Arrays.asList(myGroups));
     groups.refresh();
@@ -163,4 +169,54 @@ public class TestGroupsCaching {
         FakeunPrivilegedGroupMapping.invoked);
 
   }
+
+  @Test
+  public void testNegativeGroupCaching() throws Exception {
+    final String user = "negcache";
+    final String failMessage = "Did not throw IOException: ";
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 2);
+    FakeTimer timer = new FakeTimer();
+    Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.addToBlackList(user);
+
+    // In the first attempt, the user will be put in the negative cache.
+    try {
+      groups.getGroups(user);
+      fail(failMessage + "Failed to obtain groups from FakeGroupMapping.");
+    } catch (IOException e) {
+      // Expects to raise exception for the first time. But the user will be
+      // put into the negative cache
+      GenericTestUtils.assertExceptionContains("No groups found for user", e);
+    }
+
+    // The second time, the user is in the negative cache.
+    try {
+      groups.getGroups(user);
+      fail(failMessage + "The user is in the negative cache.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No groups found for user", e);
+    }
+
+    // Brings back the backend user-group mapping service.
+    FakeGroupMapping.clearBlackList();
+
+    // It should still get groups from the negative cache.
+    try {
+      groups.getGroups(user);
+      fail(failMessage + "The user is still in the negative cache, even " +
+          "FakeGroupMapping has resumed.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No groups found for user", e);
+    }
+
+    // Let the elements in the negative cache expire.
+    timer.advance(4 * 1000);
+
+    // The groups for the user is expired in the negative cache, a new copy of
+    // groups for the user is fetched.
+    assertEquals(Arrays.asList(myGroups), groups.getGroups(user));
+  }
 }

+ 52 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * FakeTimer can be used for test purposes to control the return values
+ * from {{@link Timer}}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FakeTimer extends Timer {
+  private long nowMillis;
+
+  /** Constructs a FakeTimer with a non-zero value */
+  public FakeTimer() {
+    nowMillis = 1000;  // Initialize with a non-trivial value.
+  }
+
+  @Override
+  public long now() {
+    return nowMillis;
+  }
+
+  @Override
+  public long monotonicNow() {
+    return nowMillis;
+  }
+
+  /** Increases the time by milliseconds */
+  public void advance(long advMillis) {
+    nowMillis += advMillis;
+  }
+}

+ 15 - 0
hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml

@@ -79,4 +79,19 @@
     </description>
   </property>
 
+  <property>
+    <name>hadoop.kms.acl.GENERATE_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for generateEncryptedKey CryptoExtension operations
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.acl.DECRYPT_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for decrypt EncryptedKey CryptoExtension operations
+    </description>
+  </property>
 </configuration>

+ 149 - 0
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key.kms.server;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.kms.ValueQueue;
+import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
+
+/**
+ * A {@link KeyProviderCryptoExtension} that pre-generates and caches encrypted 
+ * keys.
+ */
+@InterfaceAudience.Private
+public class EagerKeyGeneratorKeyProviderCryptoExtension 
+    extends KeyProviderCryptoExtension {
+
+  private static final String KEY_CACHE_PREFIX =
+      "hadoop.security.kms.encrypted.key.cache.";
+
+  public static final String KMS_KEY_CACHE_SIZE =
+      KEY_CACHE_PREFIX + "size";
+  public static final int KMS_KEY_CACHE_SIZE_DEFAULT = 100;
+
+  public static final String KMS_KEY_CACHE_LOW_WATERMARK =
+      KEY_CACHE_PREFIX + "low.watermark";
+  public static final float KMS_KEY_CACHE_LOW_WATERMARK_DEFAULT = 0.30f;
+
+  public static final String KMS_KEY_CACHE_EXPIRY_MS =
+      KEY_CACHE_PREFIX + "expiry";
+  public static final int KMS_KEY_CACHE_EXPIRY_DEFAULT = 43200000;
+
+  public static final String KMS_KEY_CACHE_NUM_REFILL_THREADS =
+      KEY_CACHE_PREFIX + "num.fill.threads";
+  public static final int KMS_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT = 2;
+
+
+  private static class CryptoExtension 
+      implements KeyProviderCryptoExtension.CryptoExtension {
+
+    private class EncryptedQueueRefiller implements
+        ValueQueue.QueueRefiller<EncryptedKeyVersion> {
+
+      @Override
+      public void fillQueueForKey(String keyName,
+          Queue<EncryptedKeyVersion> keyQueue, int numKeys) throws IOException {
+        List<EncryptedKeyVersion> retEdeks =
+            new LinkedList<EncryptedKeyVersion>();
+        for (int i = 0; i < numKeys; i++) {
+          try {
+            retEdeks.add(keyProviderCryptoExtension.generateEncryptedKey(
+                keyName));
+          } catch (GeneralSecurityException e) {
+            throw new IOException(e);
+          }
+        }
+        keyQueue.addAll(retEdeks);
+      }
+    }
+
+    private KeyProviderCryptoExtension keyProviderCryptoExtension;
+    private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
+
+    public CryptoExtension(Configuration conf, 
+        KeyProviderCryptoExtension keyProviderCryptoExtension) {
+      this.keyProviderCryptoExtension = keyProviderCryptoExtension;
+      encKeyVersionQueue =
+          new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
+              conf.getInt(KMS_KEY_CACHE_SIZE,
+                  KMS_KEY_CACHE_SIZE_DEFAULT),
+              conf.getFloat(KMS_KEY_CACHE_LOW_WATERMARK,
+                  KMS_KEY_CACHE_LOW_WATERMARK_DEFAULT),
+              conf.getInt(KMS_KEY_CACHE_EXPIRY_MS,
+                  KMS_KEY_CACHE_EXPIRY_DEFAULT),
+              conf.getInt(KMS_KEY_CACHE_NUM_REFILL_THREADS,
+                  KMS_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
+              SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller()
+          );
+    }
+
+    @Override
+    public void warmUpEncryptedKeys(String... keyNames) throws
+                                                        IOException {
+      try {
+        encKeyVersionQueue.initializeQueuesForKeys(keyNames);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
+        throws IOException, GeneralSecurityException {
+      try {
+        return encKeyVersionQueue.getNext(encryptionKeyName);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public KeyVersion
+    decryptEncryptedKey(EncryptedKeyVersion encryptedKeyVersion)
+        throws IOException, GeneralSecurityException {
+      return keyProviderCryptoExtension.decryptEncryptedKey(
+          encryptedKeyVersion);
+    }
+  }
+
+  /**
+   * This class is a proxy for a <code>KeyProviderCryptoExtension</code> that
+   * decorates the underlying <code>CryptoExtension</code> with one that eagerly
+   * caches pre-generated Encrypted Keys using a <code>ValueQueue</code>
+   * 
+   * @param conf Configuration object to load parameters from
+   * @param keyProviderCryptoExtension <code>KeyProviderCryptoExtension</code>
+   * to delegate calls to.
+   */
+  public EagerKeyGeneratorKeyProviderCryptoExtension(Configuration conf,
+      KeyProviderCryptoExtension keyProviderCryptoExtension) {
+    super(keyProviderCryptoExtension, 
+        new CryptoExtension(conf, keyProviderCryptoExtension));
+  }
+
+}

+ 96 - 1
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.crypto.key.kms.server;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -29,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -39,10 +42,14 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.SecurityContext;
+
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.Principal;
 import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -61,8 +68,10 @@ public class KMS {
   private static final String GET_CURRENT_KEY = "GET_CURRENT_KEY";
   private static final String GET_KEY_VERSIONS = "GET_KEY_VERSIONS";
   private static final String GET_METADATA = "GET_METADATA";
+  private static final String GENERATE_EEK = "GENERATE_EEK";
+  private static final String DECRYPT_EEK = "DECRYPT_EEK";
 
-  private KeyProvider provider;
+  private KeyProviderCryptoExtension provider;
 
   public KMS() throws Exception {
     provider = KMSWebApp.getKeyProvider();
@@ -289,6 +298,92 @@ public class KMS {
     return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @GET
+  @Path(KMSRESTConstants.KEY_RESOURCE + "/{name:.*}/" +
+      KMSRESTConstants.EEK_SUB_RESOURCE)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response generateEncryptedKeys(
+          @Context SecurityContext securityContext,
+          @PathParam("name") String name,
+          @QueryParam(KMSRESTConstants.EEK_OP) String edekOp,
+          @DefaultValue("1")
+          @QueryParam(KMSRESTConstants.EEK_NUM_KEYS) int numKeys)
+          throws Exception {
+    Principal user = getPrincipal(securityContext);
+    KMSClientProvider.checkNotEmpty(name, "name");
+    KMSClientProvider.checkNotNull(edekOp, "eekOp");
+
+    Object retJSON;
+    if (edekOp.equals(KMSRESTConstants.EEK_GENERATE)) {
+      assertAccess(KMSACLs.Type.GENERATE_EEK, user, GENERATE_EEK, name);
+
+      List<EncryptedKeyVersion> retEdeks =
+          new LinkedList<EncryptedKeyVersion>();
+      try {
+        for (int i = 0; i < numKeys; i ++) {
+          retEdeks.add(provider.generateEncryptedKey(name));
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      KMSAudit.ok(user, GENERATE_EEK, name, "");
+      retJSON = new ArrayList();
+      for (EncryptedKeyVersion edek : retEdeks) {
+        ((ArrayList)retJSON).add(KMSServerJSONUtils.toJSON(edek));
+      }
+    } else {
+      throw new IllegalArgumentException("Wrong " + KMSRESTConstants.EEK_OP +
+          " value, it must be " + KMSRESTConstants.EEK_GENERATE + " or " +
+          KMSRESTConstants.EEK_DECRYPT);
+    }
+    KMSWebApp.getGenerateEEKCallsMeter().mark();
+    return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON)
+        .build();
+  }
+
+  @SuppressWarnings("rawtypes")
+  @POST
+  @Path(KMSRESTConstants.KEY_VERSION_RESOURCE + "/{versionName:.*}/" +
+      KMSRESTConstants.EEK_SUB_RESOURCE)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response decryptEncryptedKey(@Context SecurityContext securityContext,
+      @PathParam("versionName") String versionName,
+      @QueryParam(KMSRESTConstants.EEK_OP) String eekOp,
+      Map jsonPayload)
+      throws Exception {
+    Principal user = getPrincipal(securityContext);
+    KMSClientProvider.checkNotEmpty(versionName, "versionName");
+    KMSClientProvider.checkNotNull(eekOp, "eekOp");
+
+    String keyName = (String) jsonPayload.get(KMSRESTConstants.NAME_FIELD);
+    String ivStr = (String) jsonPayload.get(KMSRESTConstants.IV_FIELD);
+    String encMaterialStr = 
+        (String) jsonPayload.get(KMSRESTConstants.MATERIAL_FIELD);
+    Object retJSON;
+    if (eekOp.equals(KMSRESTConstants.EEK_DECRYPT)) {
+      assertAccess(KMSACLs.Type.DECRYPT_EEK, user, DECRYPT_EEK, versionName);
+      KMSClientProvider.checkNotNull(ivStr, KMSRESTConstants.IV_FIELD);
+      byte[] iv = Base64.decodeBase64(ivStr);
+      KMSClientProvider.checkNotNull(encMaterialStr,
+          KMSRESTConstants.MATERIAL_FIELD);
+      byte[] encMaterial = Base64.decodeBase64(encMaterialStr);
+      KeyProvider.KeyVersion retKeyVersion =
+          provider.decryptEncryptedKey(
+              new KMSClientProvider.KMSEncryptedKeyVersion(keyName, versionName,
+                  iv, KeyProviderCryptoExtension.EEK, encMaterial));
+      retJSON = KMSServerJSONUtils.toJSON(retKeyVersion);
+      KMSAudit.ok(user, DECRYPT_EEK, versionName, "");
+    } else {
+      throw new IllegalArgumentException("Wrong " + KMSRESTConstants.EEK_OP +
+          " value, it must be " + KMSRESTConstants.EEK_GENERATE + " or " +
+          KMSRESTConstants.EEK_DECRYPT);
+    }
+    KMSWebApp.getDecryptEEKCallsMeter().mark();
+    return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON)
+        .build();
+  }
+
   @GET
   @Path(KMSRESTConstants.KEY_RESOURCE + "/{name:.*}/" +
       KMSRESTConstants.VERSIONS_SUB_RESOURCE)

+ 4 - 1
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.crypto.key.kms.server;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,12 +35,14 @@ import java.util.concurrent.TimeUnit;
  * hot-reloading them if the <code>kms-acls.xml</code> file where the ACLs
  * are defined has been updated.
  */
+@InterfaceAudience.Private
 public class KMSACLs implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(KMSACLs.class);
 
 
   public enum Type {
-    CREATE, DELETE, ROLLOVER, GET, GET_KEYS, GET_METADATA, SET_KEY_MATERIAL;
+    CREATE, DELETE, ROLLOVER, GET, GET_KEYS, GET_METADATA,
+    SET_KEY_MATERIAL, GENERATE_EEK, DECRYPT_EEK;
 
     public String getConfigKey() {
       return KMSConfiguration.CONFIG_PREFIX + "acl." + this.toString();

+ 20 - 1
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java

@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.crypto.key.kms.server;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
 
 import java.util.ArrayList;
@@ -39,7 +41,9 @@ public class KMSServerJSONUtils {
           keyVersion.getName());
       json.put(KMSRESTConstants.VERSION_NAME_FIELD,
           keyVersion.getVersionName());
-      json.put(KMSRESTConstants.MATERIAL_FIELD, keyVersion.getMaterial());
+      json.put(KMSRESTConstants.MATERIAL_FIELD,
+          Base64.encodeBase64URLSafeString(
+              keyVersion.getMaterial()));
     }
     return json;
   }
@@ -55,6 +59,21 @@ public class KMSServerJSONUtils {
     return json;
   }
 
+  @SuppressWarnings("unchecked")
+  public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) {
+    Map json = new LinkedHashMap();
+    if (encryptedKeyVersion != null) {
+      json.put(KMSRESTConstants.VERSION_NAME_FIELD,
+          encryptedKeyVersion.getKeyVersionName());
+      json.put(KMSRESTConstants.IV_FIELD,
+          Base64.encodeBase64URLSafeString(
+              encryptedKeyVersion.getIv()));
+      json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD,
+          toJSON(encryptedKeyVersion.getEncryptedKey()));
+    }
+    return json;
+  }
+
   @SuppressWarnings("unchecked")
   public static Map toJSON(String keyName, KeyProvider.Metadata meta) {
     Map json = new LinkedHashMap();

+ 30 - 4
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java

@@ -20,10 +20,12 @@ package org.apache.hadoop.crypto.key.kms.server;
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.CachingKeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -35,6 +37,7 @@ import org.slf4j.bridge.SLF4JBridgeHandler;
 
 import javax.servlet.ServletContextEvent;
 import javax.servlet.ServletContextListener;
+
 import java.io.File;
 import java.net.URL;
 import java.util.List;
@@ -55,6 +58,10 @@ public class KMSWebApp implements ServletContextListener {
       "unauthorized.calls.meter";
   private static final String UNAUTHENTICATED_CALLS_METER = METRICS_PREFIX +
       "unauthenticated.calls.meter";
+  private static final String GENERATE_EEK_METER = METRICS_PREFIX +
+      "generate_eek.calls.meter";
+  private static final String DECRYPT_EEK_METER = METRICS_PREFIX +
+      "decrypt_eek.calls.meter";
 
   private static Logger LOG;
   private static MetricRegistry metricRegistry;
@@ -66,8 +73,10 @@ public class KMSWebApp implements ServletContextListener {
   private static Meter keyCallsMeter;
   private static Meter unauthorizedCallsMeter;
   private static Meter unauthenticatedCallsMeter;
+  private static Meter decryptEEKCallsMeter;
+  private static Meter generateEEKCallsMeter;
   private static Meter invalidCallsMeter;
-  private static KeyProvider keyProvider;
+  private static KeyProviderCryptoExtension keyProviderCryptoExtension;
 
   static {
     SLF4JBridgeHandler.removeHandlersForRootLogger();
@@ -122,6 +131,10 @@ public class KMSWebApp implements ServletContextListener {
       metricRegistry = new MetricRegistry();
       jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
       jmxReporter.start();
+      generateEEKCallsMeter = metricRegistry.register(GENERATE_EEK_METER,
+          new Meter());
+      decryptEEKCallsMeter = metricRegistry.register(DECRYPT_EEK_METER,
+          new Meter());
       adminCallsMeter = metricRegistry.register(ADMIN_CALLS_METER, new Meter());
       keyCallsMeter = metricRegistry.register(KEY_CALLS_METER, new Meter());
       invalidCallsMeter = metricRegistry.register(INVALID_CALLS_METER,
@@ -150,7 +163,7 @@ public class KMSWebApp implements ServletContextListener {
             "the first provider",
             kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
       }
-      keyProvider = providers.get(0);
+      KeyProvider keyProvider = providers.get(0);
       if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
           KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
         long keyTimeOutMillis =
@@ -162,6 +175,11 @@ public class KMSWebApp implements ServletContextListener {
         keyProvider = new CachingKeyProvider(keyProvider, keyTimeOutMillis,
             currKeyTimeOutMillis);
       }
+      keyProviderCryptoExtension = KeyProviderCryptoExtension.
+          createKeyProviderCryptoExtension(keyProvider);
+      keyProviderCryptoExtension = 
+          new EagerKeyGeneratorKeyProviderCryptoExtension(kmsConf, 
+              keyProviderCryptoExtension);
 
       LOG.info("KMS Started");
     } catch (Throwable ex) {
@@ -208,6 +226,14 @@ public class KMSWebApp implements ServletContextListener {
     return invalidCallsMeter;
   }
 
+  public static Meter getGenerateEEKCallsMeter() {
+    return generateEEKCallsMeter;
+  }
+
+  public static Meter getDecryptEEKCallsMeter() {
+    return decryptEEKCallsMeter;
+  }
+
   public static Meter getUnauthorizedCallsMeter() {
     return unauthorizedCallsMeter;
   }
@@ -216,7 +242,7 @@ public class KMSWebApp implements ServletContextListener {
     return unauthenticatedCallsMeter;
   }
 
-  public static KeyProvider getKeyProvider() {
-    return keyProvider;
+  public static KeyProviderCryptoExtension getKeyProvider() {
+    return keyProviderCryptoExtension;
   }
 }

+ 83 - 0
hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm

@@ -279,6 +279,25 @@ $ keytool -genkey -alias tomcat -keyalg RSA
         to provide the key material when creating or rolling a key.
     </description>
   </property>
+
+  <property>
+    <name>hadoop.kms.acl.GENERATE_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for generateEncryptedKey
+      CryptoExtension operations
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.acl.DECRYPT_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for decrypt EncryptedKey
+      CryptoExtension operations
+    </description>
+  </property>
+</configuration>
 +---+
 
 ** KMS HTTP REST API
@@ -396,6 +415,70 @@ Content-Type: application/json
 }
 +---+
 
+
+*** Generate Encrypted Key for Current KeyVersion
+
+  <REQUEST:>
+
++---+
+GET http://HOST:PORT/kms/v1/key/<key-name>/_eek?eek_op=generate&num_keys=<number-of-keys-to-generate>
++---+
+
+  <RESPONSE:>
+
++---+
+200 OK
+Content-Type: application/json
+[
+  {
+    "versionName"         : "encryptionVersionName",
+    "iv"                  : "<iv>",          //base64
+    "encryptedKeyVersion" : {
+        "versionName"       : "EEK",
+        "material"          : "<material>",    //base64
+    }
+  },
+  {
+    "versionName"         : "encryptionVersionName",
+    "iv"                  : "<iv>",          //base64
+    "encryptedKeyVersion" : {
+        "versionName"       : "EEK",
+        "material"          : "<material>",    //base64
+    }
+  },
+  ...
+]
++---+
+
+*** Decrypt Encrypted Key
+
+  <REQUEST:>
+
++---+
+POST http://HOST:PORT/kms/v1/keyversion/<version-name>/_eek?ee_op=decrypt
+Content-Type: application/json
+
+{
+  "name"        : "<key-name>",
+  "iv"          : "<iv>",          //base64
+  "material"    : "<material>",    //base64
+}
+
++---+
+
+  <RESPONSE:>
+
++---+
+200 OK
+Content-Type: application/json
+
+{
+  "name"        : "EK",
+  "material"    : "<material>",    //base64
+}
++---+
+
+
 *** Get Key Version
 
   <REQUEST:>

+ 100 - 11
hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java

@@ -19,6 +19,9 @@ package org.apache.hadoop.crypto.key.kms.server;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -36,6 +39,7 @@ import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.LoginContext;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -267,7 +271,7 @@ public class TestKMS {
     }
   }
 
-  private void doAs(String user, final PrivilegedExceptionAction<Void> action)
+  private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
       throws Exception {
     Set<Principal> principals = new HashSet<Principal>();
     principals.add(new KerberosPrincipal(user));
@@ -280,7 +284,7 @@ public class TestKMS {
     try {
       loginContext.login();
       subject = loginContext.getSubject();
-      Subject.doAs(subject, action);
+      return Subject.doAs(subject, action);
     } finally {
       loginContext.logout();
     }
@@ -474,6 +478,32 @@ public class TestKMS {
         Assert.assertNotNull(kms1[0].getCreated());
         Assert.assertTrue(started.before(kms1[0].getCreated()));
 
+        // test generate and decryption of EEK
+        KeyProvider.KeyVersion kv = kp.getCurrentKey("k1");
+        KeyProviderCryptoExtension kpExt =
+            KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+
+        EncryptedKeyVersion ek1 = kpExt.generateEncryptedKey(kv.getName());
+        Assert.assertEquals(KeyProviderCryptoExtension.EEK,
+            ek1.getEncryptedKey().getVersionName());
+        Assert.assertNotNull(ek1.getEncryptedKey().getMaterial());
+        Assert.assertEquals(kv.getMaterial().length,
+            ek1.getEncryptedKey().getMaterial().length);
+        KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
+        Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
+        KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
+        Assert.assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
+        Assert.assertEquals(kv.getMaterial().length, k1.getMaterial().length);
+
+        EncryptedKeyVersion ek2 = kpExt.generateEncryptedKey(kv.getName());
+        KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
+        boolean isEq = true;
+        for (int i = 0; isEq && i < ek2.getEncryptedKey().getMaterial().length;
+            i++) {
+          isEq = k2.getMaterial()[i] == k1.getMaterial()[i];
+        }
+        Assert.assertFalse(isEq);
+
         // deleteKey()
         kp.deleteKey("k1");
 
@@ -565,7 +595,7 @@ public class TestKMS {
       @Override
       public Void call() throws Exception {
         final Configuration conf = new Configuration();
-        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 64);
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
         URI uri = createKMSUri(getKMSUrl());
         final KeyProvider kp = new KMSClientProvider(uri, conf);
 
@@ -582,7 +612,7 @@ public class TestKMS {
               Assert.fail(ex.toString());
             }
             try {
-              kp.createKey("k", new byte[8], new KeyProvider.Options(conf));
+              kp.createKey("k", new byte[16], new KeyProvider.Options(conf));
               Assert.fail();
             } catch (AuthorizationException ex) {
               //NOP
@@ -598,7 +628,7 @@ public class TestKMS {
               Assert.fail(ex.toString());
             }
             try {
-              kp.rollNewVersion("k", new byte[8]);
+              kp.rollNewVersion("k", new byte[16]);
               Assert.fail();
             } catch (AuthorizationException ex) {
               //NOP
@@ -690,7 +720,7 @@ public class TestKMS {
           @Override
           public Void run() throws Exception {
             try {
-              KeyProvider.KeyVersion kv = kp.createKey("k1", new byte[8],
+              KeyProvider.KeyVersion kv = kp.createKey("k1", new byte[16],
                   new KeyProvider.Options(conf));
               Assert.assertNull(kv.getMaterial());
             } catch (Exception ex) {
@@ -717,7 +747,8 @@ public class TestKMS {
           @Override
           public Void run() throws Exception {
             try {
-              KeyProvider.KeyVersion kv = kp.rollNewVersion("k1", new byte[8]);
+              KeyProvider.KeyVersion kv =
+                  kp.rollNewVersion("k1", new byte[16]);
               Assert.assertNull(kv.getMaterial());
             } catch (Exception ex) {
               Assert.fail(ex.toString());
@@ -726,12 +757,46 @@ public class TestKMS {
           }
         });
 
-        doAs("GET", new PrivilegedExceptionAction<Void>() {
+        final KeyVersion currKv =
+            doAs("GET", new PrivilegedExceptionAction<KeyVersion>() {
           @Override
-          public Void run() throws Exception {
+          public KeyVersion run() throws Exception {
             try {
               kp.getKeyVersion("k1@0");
-              kp.getCurrentKey("k1");
+              KeyVersion kv = kp.getCurrentKey("k1");
+              return kv;
+            } catch (Exception ex) {
+              Assert.fail(ex.toString());
+            }
+            return null;
+          }
+        });
+
+        final EncryptedKeyVersion encKv =
+            doAs("GENERATE_EEK",
+                new PrivilegedExceptionAction<EncryptedKeyVersion>() {
+          @Override
+          public EncryptedKeyVersion run() throws Exception {
+            try {
+              KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension.
+                      createKeyProviderCryptoExtension(kp);
+              EncryptedKeyVersion ek1 =
+                  kpCE.generateEncryptedKey(currKv.getName());
+              return ek1;
+            } catch (Exception ex) {
+              Assert.fail(ex.toString());
+            }
+            return null;
+          }
+        });
+
+        doAs("DECRYPT_EEK", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            try {
+              KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension.
+                      createKeyProviderCryptoExtension(kp);
+              kpCE.decryptEncryptedKey(encKv);
             } catch (Exception ex) {
               Assert.fail(ex.toString());
             }
@@ -817,7 +882,7 @@ public class TestKMS {
       @Override
       public Void call() throws Exception {
         final Configuration conf = new Configuration();
-        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 64);
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
         URI uri = createKMSUri(getKMSUrl());
         final KeyProvider kp = new KMSClientProvider(uri, conf);
 
@@ -889,6 +954,30 @@ public class TestKMS {
       Assert.assertTrue("Caught unexpected exception" + e.toString(), false);
     }
 
+    caughtTimeout = false;
+    try {
+      KeyProvider kp = new KMSClientProvider(uri, conf);
+      KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp)
+          .generateEncryptedKey("a");
+    } catch (SocketTimeoutException e) {
+      caughtTimeout = true;
+    } catch (IOException e) {
+      Assert.assertTrue("Caught unexpected exception" + e.toString(), false);
+    }
+
+    caughtTimeout = false;
+    try {
+      KeyProvider kp = new KMSClientProvider(uri, conf);
+      KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp)
+          .decryptEncryptedKey(
+              new KMSClientProvider.KMSEncryptedKeyVersion("a",
+                  "a", new byte[] {1, 2}, "EEK", new byte[] {1, 2}));
+    } catch (SocketTimeoutException e) {
+      caughtTimeout = true;
+    } catch (IOException e) {
+      Assert.assertTrue("Caught unexpected exception" + e.toString(), false);
+    }
+
     Assert.assertTrue(caughtTimeout);
 
     sock.close();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -606,6 +606,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second
     instead of millisecond. (Juan Yu via wang)
 
+    HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes
+    correctly.  (szetszwo) 
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

+ 12 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -148,14 +148,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
       boolean avoidStaleNodes = stats != null
           && stats.isAvoidingStaleDataNodesForWrite();
-      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+      for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
         DatanodeDescriptor favoredNode = favoredNodes.get(i);
         // Choose a single node which is local to favoredNode.
         // 'results' is updated within chooseLocalNode
         final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
             getMaxNodesPerRack(results.size(), numOfReplicas)[1],
-            results, avoidStaleNodes, storageTypes.get(0));
+            results, avoidStaleNodes, storageTypes.get(0), false);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
@@ -277,7 +277,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     try {
       if (numOfResults == 0) {
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0))
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -351,12 +351,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                                              int maxNodesPerRack,
                                              List<DatanodeStorageInfo> results,
                                              boolean avoidStaleNodes,
-                                             StorageType storageType)
+                                             StorageType storageType,
+                                             boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
-    if (localMachine == null)
+    if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes, storageType);
+    }
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
@@ -369,7 +371,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
           }
         }
       } 
-    }      
+    }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
         maxNodesPerRack, results, avoidStaleNodes, storageType);

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java

@@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      StorageType storageType, boolean fallbackToLocalRack
+      ) throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
@@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
     if (chosenStorage != null) {
       return chosenStorage;
     }
+
+    if (!fallbackToLocalRack) {
+      return null;
+    }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
         blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);

+ 38 - 24
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java

@@ -18,32 +18,41 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-import java.util.ArrayList;
-import java.util.Random;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.junit.Test;
+import org.apache.log4j.Level;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 
 public class TestFavoredNodesEndToEnd {
+  {
+    ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)).getLogger().setLevel(Level.ALL);
+  }
+
   private static MiniDFSCluster cluster;
   private static Configuration conf;
   private final static int NUM_DATA_NODES = 10;
@@ -79,7 +88,7 @@ public class TestFavoredNodesEndToEnd {
       InetSocketAddress datanode[] = getDatanodes(rand);
       Path p = new Path("/filename"+i);
       FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
-          4096, (short)3, (long)4096, null, datanode);
+          4096, (short)3, 4096L, null, datanode);
       out.write(SOME_BYTES);
       out.close();
       BlockLocation[] locations = getBlockLocations(p);
@@ -98,14 +107,13 @@ public class TestFavoredNodesEndToEnd {
     //get some other nodes. In other words, the write to hdfs should not fail
     //and if we do getBlockLocations on the file, we should see one blklocation
     //and three hosts for that
-    Random rand = new Random(System.currentTimeMillis());
     InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
     for (int i = 0; i < 3; i++) {
       arbitraryAddrs[i] = getArbitraryLocalHostAddr();
     }
     Path p = new Path("/filename-foo-bar");
     FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
-        4096, (short)3, (long)4096, null, arbitraryAddrs);
+        4096, (short)3, 4096L, null, arbitraryAddrs);
     out.write(SOME_BYTES);
     out.close();
     getBlockLocations(p);
@@ -113,35 +121,41 @@ public class TestFavoredNodesEndToEnd {
 
   @Test(timeout=180000)
   public void testWhenSomeNodesAreNotGood() throws Exception {
+    // 4 favored nodes
+    final InetSocketAddress addrs[] = new InetSocketAddress[4];
+    final String[] hosts = new String[addrs.length];
+    for (int i = 0; i < addrs.length; i++) {
+      addrs[i] = datanodes.get(i).getXferAddress();
+      hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort();
+    }
+
     //make some datanode not "good" so that even if the client prefers it,
     //the namenode would not give it as a replica to write to
     DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
            .getDatanodeManager().getDatanodeByXferAddr(
-               datanodes.get(0).getXferAddress().getAddress().getHostAddress(), 
-               datanodes.get(0).getXferAddress().getPort());
+               addrs[0].getAddress().getHostAddress(), addrs[0].getPort());
     //set the decommission status to true so that 
     //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
     d.setDecommissioned();
-    InetSocketAddress addrs[] = new InetSocketAddress[3];
-    for (int i = 0; i < 3; i++) {
-      addrs[i] = datanodes.get(i).getXferAddress();
-    }
     Path p = new Path("/filename-foo-bar-baz");
+    final short replication = (short)3;
     FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
-        4096, (short)3, (long)4096, null, addrs);
+        4096, replication, 4096L, null, addrs);
     out.write(SOME_BYTES);
     out.close();
     //reset the state
     d.stopDecommission();
+
     BlockLocation[] locations = getBlockLocations(p);
+    Assert.assertEquals(replication, locations[0].getNames().length);;
     //also make sure that the datanode[0] is not in the list of hosts
-    String datanode0 = 
-        datanodes.get(0).getXferAddress().getAddress().getHostAddress()
-        + ":" + datanodes.get(0).getXferAddress().getPort();
-    for (int i = 0; i < 3; i++) {
-      if (locations[0].getNames()[i].equals(datanode0)) {
-        fail(datanode0 + " not supposed to be a replica for the block");
-      }
+    for (int i = 0; i < replication; i++) {
+      final String loc = locations[0].getNames()[i];
+      int j = 0;
+      for(; j < hosts.length && !loc.equals(hosts[j]); j++);
+      Assert.assertTrue("j=" + j, j > 0);
+      Assert.assertTrue("loc=" + loc + " not in host list "
+          + Arrays.asList(hosts) + ", j=" + j, j < hosts.length);
     }
   }
 

+ 6 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -169,6 +169,12 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
     attempt is the last retry. (Wangda Tan via zjshen)
 
+    MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
+    enabled if custom output format/committer is used (Sangjin Lee via jlowe)
+
+    MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories
+    in its results (Jason Dere via jlowe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 195 - 73
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
@@ -200,6 +199,7 @@ public class MRAppMaster extends CompositeService {
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
+  private ClassLoader jobClassLoader;
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
@@ -250,6 +250,9 @@ public class MRAppMaster extends CompositeService {
 
   @Override
   protected void serviceInit(final Configuration conf) throws Exception {
+    // create the job classloader if enabled
+    createJobClassLoader(conf);
+
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     initJobCredentialsAndUGI(conf);
@@ -387,8 +390,13 @@ public class MRAppMaster extends CompositeService {
       addIfService(committerEventHandler);
 
       //policy handling preemption requests from RM
-      preemptionPolicy = createPreemptionPolicy(conf);
-      preemptionPolicy.init(context);
+      callWithJobClassLoader(conf, new Action<Void>() {
+        public Void call(Configuration conf) {
+          preemptionPolicy = createPreemptionPolicy(conf);
+          preemptionPolicy.init(context);
+          return null;
+        }
+      });
 
       //service to handle requests to TaskUmbilicalProtocol
       taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@@ -453,33 +461,37 @@ public class MRAppMaster extends CompositeService {
   }
 
   private OutputCommitter createOutputCommitter(Configuration conf) {
-    OutputCommitter committer = null;
-
-    LOG.info("OutputCommitter set in config "
-        + conf.get("mapred.output.committer.class"));
-
-    if (newApiCommitter) {
-      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
-          .newTaskId(jobId, 0, TaskType.MAP);
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
-          .newTaskAttemptId(taskID, 0);
-      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
-          TypeConverter.fromYarn(attemptID));
-      OutputFormat outputFormat;
-      try {
-        outputFormat = ReflectionUtils.newInstance(taskContext
-            .getOutputFormatClass(), conf);
-        committer = outputFormat.getOutputCommitter(taskContext);
-      } catch (Exception e) {
-        throw new YarnRuntimeException(e);
+    return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+      public OutputCommitter call(Configuration conf) {
+        OutputCommitter committer = null;
+
+        LOG.info("OutputCommitter set in config "
+            + conf.get("mapred.output.committer.class"));
+
+        if (newApiCommitter) {
+          org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+              MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+              MRBuilderUtils.newTaskAttemptId(taskID, 0);
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+              TypeConverter.fromYarn(attemptID));
+          OutputFormat outputFormat;
+          try {
+            outputFormat = ReflectionUtils.newInstance(taskContext
+                .getOutputFormatClass(), conf);
+            committer = outputFormat.getOutputCommitter(taskContext);
+          } catch (Exception e) {
+            throw new YarnRuntimeException(e);
+          }
+        } else {
+          committer = ReflectionUtils.newInstance(conf.getClass(
+              "mapred.output.committer.class", FileOutputCommitter.class,
+              org.apache.hadoop.mapred.OutputCommitter.class), conf);
+        }
+        LOG.info("OutputCommitter is " + committer.getClass().getName());
+        return committer;
       }
-    } else {
-      committer = ReflectionUtils.newInstance(conf.getClass(
-          "mapred.output.committer.class", FileOutputCommitter.class,
-          org.apache.hadoop.mapred.OutputCommitter.class), conf);
-    }
-    LOG.info("OutputCommitter is " + committer.getClass().getName());
-    return committer;
+    });
   }
 
   protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
@@ -667,38 +679,42 @@ public class MRAppMaster extends CompositeService {
     return new StagingDirCleaningService();
   }
 
-  protected Speculator createSpeculator(Configuration conf, AppContext context) {
-    Class<? extends Speculator> speculatorClass;
-
-    try {
-      speculatorClass
-          // "yarn.mapreduce.job.speculator.class"
-          = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
-                          DefaultSpeculator.class,
-                          Speculator.class);
-      Constructor<? extends Speculator> speculatorConstructor
-          = speculatorClass.getConstructor
-               (Configuration.class, AppContext.class);
-      Speculator result = speculatorConstructor.newInstance(conf, context);
-
-      return result;
-    } catch (InstantiationException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (InvocationTargetException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (NoSuchMethodException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    }
+  protected Speculator createSpeculator(Configuration conf,
+      final AppContext context) {
+    return callWithJobClassLoader(conf, new Action<Speculator>() {
+      public Speculator call(Configuration conf) {
+        Class<? extends Speculator> speculatorClass;
+        try {
+          speculatorClass
+              // "yarn.mapreduce.job.speculator.class"
+              = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+                              DefaultSpeculator.class,
+                              Speculator.class);
+          Constructor<? extends Speculator> speculatorConstructor
+              = speculatorClass.getConstructor
+                   (Configuration.class, AppContext.class);
+          Speculator result = speculatorConstructor.newInstance(conf, context);
+
+          return result;
+        } catch (InstantiationException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (IllegalAccessException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (InvocationTargetException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (NoSuchMethodException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        }
+      }
+    });
   }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
@@ -712,7 +728,7 @@ public class MRAppMaster extends CompositeService {
   protected EventHandler<CommitterEvent> createCommitterEventHandler(
       AppContext context, OutputCommitter committer) {
     return new CommitterEventHandler(context, committer,
-        getRMHeartbeatHandler());
+        getRMHeartbeatHandler(), jobClassLoader);
   }
 
   protected ContainerAllocator createContainerAllocator(
@@ -1083,8 +1099,8 @@ public class MRAppMaster extends CompositeService {
     //start all the components
     super.serviceStart();
 
-    // set job classloader if configured
-    MRApps.setJobClassLoader(getConfig());
+    // finally set the job classloader
+    MRApps.setClassLoader(jobClassLoader, getConfig());
 
     if (initFailed) {
       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
@@ -1101,19 +1117,24 @@ public class MRAppMaster extends CompositeService {
     TaskLog.syncLogsShutdown(logSyncer);
   }
 
-  private boolean isRecoverySupported(OutputCommitter committer2)
-      throws IOException {
+  private boolean isRecoverySupported() throws IOException {
     boolean isSupported = false;
-    JobContext _jobContext;
+    Configuration conf = getConfig();
     if (committer != null) {
+      final JobContext _jobContext;
       if (newApiCommitter) {
          _jobContext = new JobContextImpl(
-            getConfig(), TypeConverter.fromYarn(getJobId()));
+            conf, TypeConverter.fromYarn(getJobId()));
       } else {
           _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-                new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
+                new JobConf(conf), TypeConverter.fromYarn(getJobId()));
       }
-      isSupported = committer.isRecoverySupported(_jobContext);
+      isSupported = callWithJobClassLoader(conf,
+          new ExceptionAction<Boolean>() {
+            public Boolean call(Configuration conf) throws IOException {
+              return committer.isRecoverySupported(_jobContext);
+            }
+      });
     }
     return isSupported;
   }
@@ -1127,7 +1148,7 @@ public class MRAppMaster extends CompositeService {
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
 
-    boolean recoverySupportedByCommitter = isRecoverySupported(committer);
+    boolean recoverySupportedByCommitter = isRecoverySupported();
 
     // If a shuffle secret was not provided by the job client then this app
     // attempt will generate one.  However that disables recovery if there
@@ -1312,7 +1333,7 @@ public class MRAppMaster extends CompositeService {
       this.conf = config;
     }
     @Override
-    public void handle(SpeculatorEvent event) {
+    public void handle(final SpeculatorEvent event) {
       if (disabled) {
         return;
       }
@@ -1339,7 +1360,12 @@ public class MRAppMaster extends CompositeService {
       if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
         || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
         // Speculator IS enabled, direct the event to there.
-        speculator.handle(event);
+        callWithJobClassLoader(conf, new Action<Void>() {
+          public Void call(Configuration conf) {
+            speculator.handle(event);
+            return null;
+          }
+        });
       }
     }
 
@@ -1499,6 +1525,102 @@ public class MRAppMaster extends CompositeService {
     });
   }
 
+  /**
+   * Creates a job classloader based on the configuration if the job classloader
+   * is enabled. It is a no-op if the job classloader is not enabled.
+   */
+  private void createJobClassLoader(Configuration conf) throws IOException {
+    jobClassLoader = MRApps.createJobClassLoader(conf);
+  }
+
+  /**
+   * Executes the given action with the job classloader set as the configuration
+   * classloader as well as the thread context class loader if the job
+   * classloader is enabled. After the call, the original classloader is
+   * restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   */
+  <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Executes the given action that can throw a checked exception with the job
+   * classloader set as the configuration classloader as well as the thread
+   * context class loader if the job classloader is enabled. After the call, the
+   * original classloader is restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   * @throws IOException if the underlying action throws an IOException
+   * @throws YarnRuntimeException if the underlying action throws an exception
+   * other than an IOException
+   */
+  <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+      throws IOException {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } catch (IOException e) {
+      throw e;
+    } catch (YarnRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      // wrap it with a YarnRuntimeException
+      throw new YarnRuntimeException(e);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Action to be wrapped with setting and unsetting the job classloader
+   */
+  private static interface Action<T> {
+    T call(Configuration conf);
+  }
+
+  private static interface ExceptionAction<T> {
+    T call(Configuration conf) throws Exception;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();

+ 24 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java

@@ -68,6 +68,7 @@ public class CommitterEventHandler extends AbstractService
   private BlockingQueue<CommitterEvent> eventQueue =
       new LinkedBlockingQueue<CommitterEvent>();
   private final AtomicBoolean stopped;
+  private final ClassLoader jobClassLoader;
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
   private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler extends AbstractService
 
   public CommitterEventHandler(AppContext context, OutputCommitter committer,
       RMHeartbeatHandler rmHeartbeatHandler) {
+    this(context, committer, rmHeartbeatHandler, null);
+  }
+  
+  public CommitterEventHandler(AppContext context, OutputCommitter committer,
+      RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
     super("CommitterEventHandler");
     this.context = context;
     this.committer = committer;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.stopped = new AtomicBoolean(false);
+    this.jobClassLoader = jobClassLoader;
   }
 
   @Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("CommitterEvent Processor #%d")
-      .build();
+    ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+        .setNameFormat("CommitterEvent Processor #%d");
+    if (jobClassLoader != null) {
+      // if the job classloader is enabled, we need to use the job classloader
+      // as the thread context classloader (TCCL) of these threads in case the
+      // committer needs to load another class via TCCL
+      ThreadFactory backingTf = new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread thread = new Thread(r);
+          thread.setContextClassLoader(jobClassLoader);
+          return thread;
+        }
+      };
+      tfBuilder.setThreadFactory(backingTf);
+    }
+    ThreadFactory tf = tfBuilder.build();
     launcherPool = new ThreadPoolExecutor(5, 5, 1,
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {

+ 37 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -327,8 +327,8 @@ public class MRApps extends Apps {
   }
 
   /**
-   * Sets a {@link ApplicationClassLoader} on the given configuration and as
-   * the context classloader, if
+   * Creates and sets a {@link ApplicationClassLoader} on the given
+   * configuration and as the thread context classloader, if
    * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
    * the APP_CLASSPATH environment variable is set.
    * @param conf
@@ -336,24 +336,52 @@ public class MRApps extends Apps {
    */
   public static void setJobClassLoader(Configuration conf)
       throws IOException {
+    setClassLoader(createJobClassLoader(conf), conf);
+  }
+
+  /**
+   * Creates a {@link ApplicationClassLoader} if
+   * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+   * the APP_CLASSPATH environment variable is set.
+   * @param conf
+   * @returns the created job classloader, or null if the job classloader is not
+   * enabled or the APP_CLASSPATH environment variable is not set
+   * @throws IOException
+   */
+  public static ClassLoader createJobClassLoader(Configuration conf)
+      throws IOException {
+    ClassLoader jobClassLoader = null;
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
       String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
       if (appClasspath == null) {
-        LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+        LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
       } else {
-        LOG.info("Using job classloader");
+        LOG.info("Creating job classloader");
         if (LOG.isDebugEnabled()) {
           LOG.debug("APP_CLASSPATH=" + appClasspath);
         }
         String[] systemClasses = getSystemClasses(conf);
-        ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+        jobClassLoader = createJobClassLoader(appClasspath,
             systemClasses);
-        if (jobClassLoader != null) {
-          conf.setClassLoader(jobClassLoader);
-          Thread.currentThread().setContextClassLoader(jobClassLoader);
-        }
       }
     }
+    return jobClassLoader;
+  }
+
+  /**
+   * Sets the provided classloader on the given configuration and as the thread
+   * context classloader if the classloader is not null.
+   * @param classLoader
+   * @param conf
+   */
+  public static void setClassLoader(ClassLoader classLoader,
+      Configuration conf) {
+    if (classLoader != null) {
+      LOG.info("Setting classloader " + classLoader.getClass().getName() +
+          " on the configuration and as the thread context classloader");
+      conf.setClassLoader(classLoader);
+      Thread.currentThread().setContextClassLoader(classLoader);
+    }
   }
 
   @VisibleForTesting

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java

@@ -579,7 +579,7 @@ public abstract class CombineFileInputFormat<K, V>
         blocks = new OneBlockInfo[0];
       } else {
 
-        if(locations.length == 0) {
+        if(locations.length == 0 && !stat.isDirectory()) {
           locations = new BlockLocation[] { new BlockLocation() };
         }
 

+ 55 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

@@ -1274,6 +1274,61 @@ public class TestCombineFileInputFormat extends TestCase {
     fileSys.delete(file.getParent(), true);
   }
 
+  /**
+   * Test that directories do not get included as part of getSplits()
+   */
+  @Test
+  public void testGetSplitsWithDirectory() throws Exception {
+    MiniDFSCluster dfs = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      FileSystem fileSys = dfs.getFileSystem();
+
+      // Set up the following directory structure:
+      // /dir1/: directory
+      // /dir1/file: regular file
+      // /dir1/dir2/: directory
+      Path dir1 = new Path("/dir1");
+      Path file = new Path("/dir1/file1");
+      Path dir2 = new Path("/dir1/dir2");
+      if (!fileSys.mkdirs(dir1)) {
+        throw new IOException("Mkdirs failed to create " + dir1.toString());
+      }
+      FSDataOutputStream out = fileSys.create(file);
+      out.write(new byte[0]);
+      out.close();
+      if (!fileSys.mkdirs(dir2)) {
+        throw new IOException("Mkdirs failed to create " + dir2.toString());
+      }
+
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, "/dir1");
+      List<InputSplit> splits = inFormat.getSplits(job);
+
+      // directories should be omitted from getSplits() - we should only see file1 and not dir2
+      assertEquals(1, splits.size());
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(file.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(0, fileSplit.getLength(0));
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
   /**
    * Test when input files are from non-default file systems
    */

+ 86 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -33,8 +33,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
   @Test(timeout = 300000)
   public void testJobClassloader() throws IOException, InterruptedException,
       ClassNotFoundException {
-    LOG.info("\n\n\nStarting testJobClassloader().");
+    testJobClassloader(false);
+  }
+
+  @Test(timeout = 300000)
+  public void testJobClassloaderWithCustomClasses() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    testJobClassloader(true);
+  }
+
+  private void testJobClassloader(boolean useCustomClasses) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting testJobClassloader()"
+        + " useCustomClasses=" + useCustomClasses);
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
     // set master address to local to test that local mode applied iff framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
     sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+    if (useCustomClasses) {
+      // to test AM loading user classes such as output format class, we want
+      // to blacklist them from the system classes (they need to be prepended
+      // as the first match wins)
+      String systemClasses =
+          sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+      // exclude the custom classes from system classes
+      systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+          CustomSpeculator.class.getName() + "," +
+          systemClasses;
+      sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+          systemClasses);
+    }
     sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
     sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
     sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(SleepJob.class);
     job.setMaxMapAttempts(1); // speed up failures
+    if (useCustomClasses) {
+      // set custom output format class and speculator class
+      job.setOutputFormatClass(CustomOutputFormat.class);
+      final Configuration jobConf = job.getConfiguration();
+      jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+          Speculator.class);
+      // speculation needs to be enabled for the speculator to be loaded
+      jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+    }
     job.submit();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
         succeeded);
   }
 
+  public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+    public CustomOutputFormat() {
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
+  public static class CustomSpeculator extends DefaultSpeculator {
+    public CustomSpeculator(Configuration conf, AppContext context) {
+      super(conf, context);
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
   protected void verifySleepJobCounters(Job job) throws InterruptedException,
       IOException {
     Counters counters = job.getCounters();

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -54,6 +54,8 @@ Release 2.6.0 - UNRELEASED
     YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
     via Sandy Ryza)
 
+    YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -77,6 +79,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2244. FairScheduler missing handling of containers for unknown 
     application attempts. (Anubhav Dhoot via kasha)
 
+    YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement
+    (Leitao Guo via jlowe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 76 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -42,8 +42,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -54,14 +57,18 @@ import org.iq80.leveldb.Logger;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   public static final Log LOG =
       LogFactory.getLog(NMLeveldbStateStoreService.class);
 
   private static final String DB_NAME = "yarn-nm-state";
-  private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
-  private static final String DB_SCHEMA_VERSION = "1.0";
+  private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
+  
+  private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
+      .newInstance(1, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -475,22 +482,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     options.logger(new LeveldbLogger());
     LOG.info("Using state database at " + storeRoot + " for recovery");
     File dbfile = new File(storeRoot.toString());
-    byte[] schemaVersionData = null;
     try {
       db = JniDBFactory.factory.open(dbfile, options);
-      try {
-        schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
-      } catch (DBException e) {
-        throw new IOException(e.getMessage(), e);
-      }
     } catch (NativeDB.DBException e) {
       if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
         LOG.info("Creating state database at " + dbfile);
         options.createIfMissing(true);
         try {
           db = JniDBFactory.factory.open(dbfile, options);
-          schemaVersionData = bytes(DB_SCHEMA_VERSION);
-          db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData);
+          // store version
+          storeVersion();
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
         }
@@ -498,16 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         throw e;
       }
     }
-    if (schemaVersionData != null) {
-      String schemaVersion = asString(schemaVersionData);
-      // only support exact schema matches for now
-      if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
-        throw new IOException("Incompatible state database schema, found "
-            + schemaVersion + " expected " + DB_SCHEMA_VERSION);
-      }
-    } else {
-      throw new IOException("State database schema version not found");
-    }
+    checkVersion();
   }
 
   private Path createStorageDir(Configuration conf) throws IOException {
@@ -532,4 +524,68 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       LOG.info(message);
     }
   }
+
+
+  NMDBSchemaVersion loadVersion() throws IOException {
+    byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return NMDBSchemaVersion.newInstance(1, 0);
+    }
+    NMDBSchemaVersion version =
+        new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeVersion() throws IOException {
+    dbStoreVersion(CURRENT_VERSION_INFO);
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(NMDBSchemaVersion state) throws IOException {
+    dbStoreVersion(state);
+  }
+  
+  private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
+    String key = DB_SCHEMA_VERSION_KEY;
+    byte[] data = 
+        ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  NMDBSchemaVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of state-store is a major upgrade, and any
+   *    compatible change of state-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade NM state or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    NMDBSchemaVersion loadedVersion = loadVersion();
+    LOG.info("Loaded NM state version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing NM state version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new IOException(
+        "Incompatible version for NM state: expecting NM state version " 
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
+    }
+  }
+  
 }

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/NMDBSchemaVersion.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.recovery.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The version information of DB Schema for NM.
+ */
+@Private
+@Unstable
+public abstract class NMDBSchemaVersion {
+
+  public static NMDBSchemaVersion newInstance(int majorVersion, int minorVersion) {
+    NMDBSchemaVersion version = Records.newRecord(NMDBSchemaVersion.class);
+    version.setMajorVersion(majorVersion);
+    version.setMinorVersion(minorVersion);
+    return version;
+  }
+
+  public abstract int getMajorVersion();
+
+  public abstract void setMajorVersion(int majorVersion);
+
+  public abstract int getMinorVersion();
+
+  public abstract void setMinorVersion(int minorVersion);
+
+  public String toString() {
+    return getMajorVersion() + "." + getMinorVersion();
+  }
+
+  public boolean isCompatibleTo(NMDBSchemaVersion version) {
+    return getMajorVersion() == version.getMajorVersion();
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + getMajorVersion();
+    result = prime * result + getMinorVersion();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMDBSchemaVersion other = (NMDBSchemaVersion) obj;
+    if (this.getMajorVersion() == other.getMajorVersion()
+        && this.getMinorVersion() == other.getMinorVersion()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+}

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/impl/pb/NMDBSchemaVersionPBImpl.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProtoOrBuilder;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+
+@Private
+@Evolving
+public class NMDBSchemaVersionPBImpl extends NMDBSchemaVersion {
+
+  NMDBSchemaVersionProto proto = NMDBSchemaVersionProto.getDefaultInstance();
+  NMDBSchemaVersionProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public NMDBSchemaVersionPBImpl() {
+    builder = NMDBSchemaVersionProto.newBuilder();
+  }
+
+  public NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public NMDBSchemaVersionProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = NMDBSchemaVersionProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+  
+  @Override
+  public int getMajorVersion() {
+    NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMajorVersion();
+  }
+
+  @Override
+  public void setMajorVersion(int majorVersion) {
+    maybeInitBuilder();
+    builder.setMajorVersion(majorVersion);
+  }
+
+  @Override
+  public int getMinorVersion() {
+    NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMinorVersion();
+  }
+
+  @Override
+  public void setMinorVersion(int minorVersion) {
+    maybeInitBuilder();
+    builder.setMinorVersion(minorVersion);
+  }
+
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java

@@ -72,7 +72,7 @@ public class NodePage extends NMView {
           ._("Total Pmem allocated for Container",
               StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB))
           ._("Pmem enforcement enabled",
-              info.isVmemCheckEnabled())
+              info.isPmemCheckEnabled())
            ._("Total VCores allocated for Containers",
               String.valueOf(info.getTotalVCoresAllocated())) 
           ._("NodeHealthyStatus",

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto

@@ -38,3 +38,9 @@ message LocalizedResourceProto {
   optional string localPath = 2;
   optional int64 size = 3;
 }
+
+message NMDBSchemaVersionProto {
+  optional int32 majorVersion = 1;
+  optional int32 minorVersion = 2;
+}
+

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -45,9 +46,11 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -102,6 +105,36 @@ public class TestNMLeveldbStateStoreService {
     assertTrue(stateStore.canRecover());
     verifyEmptyState();
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    // default version
+    NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // compatible version
+    NMDBSchemaVersion compatibleVersion =
+        NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    stateStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
+    restartStateStore();
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // incompatible version
+    NMDBSchemaVersion incompatibleVersion =
+      NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    stateStore.storeVersion(incompatibleVersion);
+    try {
+      restartStateStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for NM state:"));
+    }
+  }
 
   @Test
   public void testStartResourceLocalization() throws IOException {