Browse Source

HADOOP-14104. Client should always ask namenode for kms provider path. Contributed by Rushabh S Shah.

Andrew Wang 8 years ago
parent
commit
ed8f738be5
16 changed files with 386 additions and 40 deletions
  1. 2 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  2. 12 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java
  3. 3 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
  4. 4 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
  5. 9 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
  6. 2 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  7. 70 8
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  8. 11 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  9. 9 8
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
  10. 3 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  11. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
  12. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
  14. 205 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
  15. 17 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java
  16. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -868,7 +868,8 @@ public abstract class FileSystem extends Configured implements Closeable {
         config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
         config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
         false,
         false,
         FS_TRASH_INTERVAL_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
-        DataChecksum.Type.CRC32);
+        DataChecksum.Type.CRC32,
+        "");
   }
   }
 
 
   /**
   /**

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -54,6 +54,7 @@ public class FsServerDefaults implements Writable {
   private boolean encryptDataTransfer;
   private boolean encryptDataTransfer;
   private long trashInterval;
   private long trashInterval;
   private DataChecksum.Type checksumType;
   private DataChecksum.Type checksumType;
+  private String keyProviderUri;
 
 
   public FsServerDefaults() {
   public FsServerDefaults() {
   }
   }
@@ -61,7 +62,8 @@ public class FsServerDefaults implements Writable {
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
       int writePacketSize, short replication, int fileBufferSize,
       int writePacketSize, short replication, int fileBufferSize,
       boolean encryptDataTransfer, long trashInterval,
       boolean encryptDataTransfer, long trashInterval,
-      DataChecksum.Type checksumType) {
+      DataChecksum.Type checksumType,
+      String keyProviderUri) {
     this.blockSize = blockSize;
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
     this.writePacketSize = writePacketSize;
@@ -70,6 +72,7 @@ public class FsServerDefaults implements Writable {
     this.encryptDataTransfer = encryptDataTransfer;
     this.encryptDataTransfer = encryptDataTransfer;
     this.trashInterval = trashInterval;
     this.trashInterval = trashInterval;
     this.checksumType = checksumType;
     this.checksumType = checksumType;
+    this.keyProviderUri = keyProviderUri;
   }
   }
 
 
   public long getBlockSize() {
   public long getBlockSize() {
@@ -104,6 +107,14 @@ public class FsServerDefaults implements Writable {
     return checksumType;
     return checksumType;
   }
   }
 
 
+  /* null means old style namenode.
+   * "" (empty string) means namenode is upgraded but EZ is not supported.
+   * some string means that value is the key provider.
+   */
+  public String getKeyProviderUri() {
+    return keyProviderUri;
+  }
+
   // /////////////////////////////////////////
   // /////////////////////////////////////////
   // Writable
   // Writable
   // /////////////////////////////////////////
   // /////////////////////////////////////////

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java

@@ -54,6 +54,7 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
   public static final long    FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final long    FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
       DataChecksum.Type.CRC32;
       DataChecksum.Type.CRC32;
+  public static final String KEY_PROVIDER_URI_DEFAULT = "";
   
   
   protected static FsServerDefaults getServerDefaults() throws IOException {
   protected static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
     return new FsServerDefaults(
@@ -64,7 +65,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
         STREAM_BUFFER_SIZE_DEFAULT,
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
-        CHECKSUM_TYPE_DEFAULT);
+        CHECKSUM_TYPE_DEFAULT,
+        KEY_PROVIDER_URI_DEFAULT);
   }
   }
 }
 }
   
   

+ 4 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java

@@ -54,6 +54,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
   public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
       DataChecksum.Type.CRC32;
       DataChecksum.Type.CRC32;
+  public static final String KEY_PROVIDER_URI_DEFAULT = "";
+
   public static FsServerDefaults getServerDefaults() throws IOException {
   public static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
     return new FsServerDefaults(
         BLOCK_SIZE_DEFAULT,
         BLOCK_SIZE_DEFAULT,
@@ -63,7 +65,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
         STREAM_BUFFER_SIZE_DEFAULT,
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
-        CHECKSUM_TYPE_DEFAULT);
+        CHECKSUM_TYPE_DEFAULT,
+        KEY_PROVIDER_URI_DEFAULT);
   }
   }
 }
 }
   
   

+ 9 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java

@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 
 /**
 /**
  * Utils for KMS.
  * Utils for KMS.
@@ -51,21 +50,20 @@ public final class KMSUtil {
   public static KeyProvider createKeyProvider(final Configuration conf,
   public static KeyProvider createKeyProvider(final Configuration conf,
       final String configKeyName) throws IOException {
       final String configKeyName) throws IOException {
     LOG.debug("Creating key provider with config key {}", configKeyName);
     LOG.debug("Creating key provider with config key {}", configKeyName);
-    final String providerUriStr = conf.getTrimmed(configKeyName, "");
+    final String providerUriStr = conf.getTrimmed(configKeyName);
     // No provider set in conf
     // No provider set in conf
-    if (providerUriStr.isEmpty()) {
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
       return null;
       return null;
     }
     }
-    final URI providerUri;
-    try {
-      providerUri = new URI(providerUriStr);
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
+    return createKeyProviderFromUri(conf, URI.create(providerUriStr));
+  }
+
+  public static KeyProvider createKeyProviderFromUri(final Configuration conf,
+      final URI providerUri) throws IOException {
     KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
     KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
     if (keyProvider == null) {
     if (keyProvider == null) {
-      throw new IOException("Could not instantiate KeyProvider from " +
-          configKeyName + " setting of '" + providerUriStr + "'");
+      throw new IOException("Could not instantiate KeyProvider for uri: " +
+          providerUri);
     }
     }
     if (keyProvider.isTransient()) {
     if (keyProvider.isTransient()) {
       throw new IOException("KeyProvider " + keyProvider.toString()
       throw new IOException("KeyProvider " + keyProvider.toString()

+ 2 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2260,6 +2260,8 @@
   <description>
   <description>
     The KeyProvider to use when managing zone keys, and interacting with
     The KeyProvider to use when managing zone keys, and interacting with
     encryption keys when reading and writing to an encryption zone.
     encryption keys when reading and writing to an encryption zone.
+    For hdfs clients, the provider path will be same as namenode's
+    provider path.
   </description>
   </description>
 </property>
 </property>
 
 

+ 70 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -172,6 +173,7 @@ import org.apache.hadoop.ipc.RpcNoSuchMethodException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -209,6 +211,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
   // 1 hour
   // 1 hour
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
+  private static final String DFS_KMS_PREFIX = "dfs-kms-";
 
 
   private final Configuration conf;
   private final Configuration conf;
   private final Tracer tracer;
   private final Tracer tracer;
@@ -226,7 +229,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final SocketFactory socketFactory;
   final SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   private final FileSystem.Statistics stats;
   private final FileSystem.Statistics stats;
-  private final String authority;
+  private final URI namenodeUri;
   private final Random r = new Random();
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
   private DataEncryptionKey encryptionKey;
@@ -239,6 +242,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       new DFSHedgedReadMetrics();
       new DFSHedgedReadMetrics();
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final int smallBufferSize;
+  private URI keyProviderUri = null;
 
 
   public DfsClientConf getConf() {
   public DfsClientConf getConf() {
     return dfsClientConf;
     return dfsClientConf;
@@ -309,7 +313,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
 
     this.ugi = UserGroupInformation.getCurrentUser();
     this.ugi = UserGroupInformation.getCurrentUser();
 
 
-    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
+    this.namenodeUri = nameNodeUri;
     this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
     this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
         ThreadLocalRandom.current().nextInt()  + "_" +
         ThreadLocalRandom.current().nextInt()  + "_" +
         Thread.currentThread().getId();
         Thread.currentThread().getId();
@@ -461,7 +465,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    *  be returned until all output streams are closed.
    *  be returned until all output streams are closed.
    */
    */
   public LeaseRenewer getLeaseRenewer() {
   public LeaseRenewer getLeaseRenewer() {
-    return LeaseRenewer.getInstance(authority, ugi, this);
+    return LeaseRenewer.getInstance(
+        namenodeUri != null ? namenodeUri.getAuthority() : "null", ugi, this);
   }
   }
 
 
   /** Get a lease and start automatic renewal */
   /** Get a lease and start automatic renewal */
@@ -2993,8 +2998,66 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return HEDGED_READ_METRIC;
     return HEDGED_READ_METRIC;
   }
   }
 
 
-  public KeyProvider getKeyProvider() {
-    return clientContext.getKeyProviderCache().get(conf);
+  /**
+   * Returns a key to map namenode uri to key provider uri.
+   * Tasks will lookup this key to find key Provider.
+   */
+  public Text getKeyProviderMapKey() {
+    return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
+        +"://" + namenodeUri.getAuthority());
+  }
+
+  /**
+   * The key provider uri is searched in the following order.
+   * 1. If there is a mapping in Credential's secrets map for namenode uri.
+   * 2. From namenode getServerDefaults rpc.
+   * 3. Finally fallback to local conf.
+   * @return keyProviderUri if found from either of above 3 cases,
+   * null otherwise
+   * @throws IOException
+   */
+  URI getKeyProviderUri() throws IOException {
+    if (keyProviderUri != null) {
+      return keyProviderUri;
+    }
+
+    // Lookup the secret in credentials object for namenodeuri.
+    Credentials credentials = ugi.getCredentials();
+    byte[] keyProviderUriBytes = credentials.getSecretKey(getKeyProviderMapKey());
+    if(keyProviderUriBytes != null) {
+      keyProviderUri =
+          URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
+      return keyProviderUri;
+    }
+
+    // Query the namenode for the key provider uri.
+    FsServerDefaults serverDefaults = getServerDefaults();
+    if (serverDefaults.getKeyProviderUri() != null) {
+      if (!serverDefaults.getKeyProviderUri().isEmpty()) {
+        keyProviderUri = URI.create(serverDefaults.getKeyProviderUri());
+      }
+      return keyProviderUri;
+    }
+
+    // Last thing is to trust its own conf to be backwards compatible.
+    String keyProviderUriStr = conf.getTrimmed(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    if (keyProviderUriStr != null && !keyProviderUriStr.isEmpty()) {
+      keyProviderUri = URI.create(keyProviderUriStr);
+    }
+    return keyProviderUri;
+  }
+
+  public KeyProvider getKeyProvider() throws IOException {
+    return clientContext.getKeyProviderCache().get(conf, getKeyProviderUri());
+  }
+
+  /*
+   * Should be used only for testing.
+   */
+  @VisibleForTesting
+  public void setKeyProviderUri(URI providerUri) {
+    this.keyProviderUri = providerUri;
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -3004,11 +3067,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
 
   /**
   /**
    * Probe for encryption enabled on this filesystem.
    * Probe for encryption enabled on this filesystem.
-   * See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)}
    * @return true if encryption is enabled
    * @return true if encryption is enabled
    */
    */
-  public boolean isHDFSEncryptionEnabled() {
-    return DFSUtilClient.isHDFSEncryptionEnabled(this.conf);
+  public boolean isHDFSEncryptionEnabled() throws IOException{
+    return getKeyProviderUri() != null;
   }
   }
 
 
   /**
   /**

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -2479,12 +2479,15 @@ public class DistributedFileSystem extends FileSystem {
   public Token<?>[] addDelegationTokens(
   public Token<?>[] addDelegationTokens(
       final String renewer, Credentials credentials) throws IOException {
       final String renewer, Credentials credentials) throws IOException {
     Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
     Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
-    if (dfs.isHDFSEncryptionEnabled()) {
+    URI keyProviderUri = dfs.getKeyProviderUri();
+    if (keyProviderUri != null) {
       KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
       KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
           KeyProviderDelegationTokenExtension.
           KeyProviderDelegationTokenExtension.
               createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
               createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
       Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
       Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
           addDelegationTokens(renewer, credentials);
           addDelegationTokens(renewer, credentials);
+      credentials.addSecretKey(dfs.getKeyProviderMapKey(),
+          DFSUtilClient.string2Bytes(keyProviderUri.toString()));
       if (tokens != null && kpTokens != null) {
       if (tokens != null && kpTokens != null) {
         Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
         Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
         System.arraycopy(tokens, 0, all, 0, tokens.length);
         System.arraycopy(tokens, 0, all, 0, tokens.length);
@@ -2517,7 +2520,13 @@ public class DistributedFileSystem extends FileSystem {
    */
    */
   @Override
   @Override
   public Path getTrashRoot(Path path) {
   public Path getTrashRoot(Path path) {
-    if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
+    try {
+      if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
+        return super.getTrashRoot(path);
+      }
+    } catch (IOException ioe) {
+      DFSClient.LOG.warn("Exception while checking whether encryption zone is "
+          + "supported", ioe);
       return super.getTrashRoot(path);
       return super.getTrashRoot(path);
     }
     }
 
 

+ 9 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.util.KMSUtil;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.Cache;
@@ -66,29 +67,29 @@ public class KeyProviderCache {
         .build();
         .build();
   }
   }
 
 
-  public KeyProvider get(final Configuration conf) {
-    URI kpURI = createKeyProviderURI(conf);
-    if (kpURI == null) {
+  public KeyProvider get(final Configuration conf,
+      final URI serverProviderUri) {
+    if (serverProviderUri == null) {
       return null;
       return null;
     }
     }
     try {
     try {
-      return cache.get(kpURI, new Callable<KeyProvider>() {
+      return cache.get(serverProviderUri, new Callable<KeyProvider>() {
         @Override
         @Override
         public KeyProvider call() throws Exception {
         public KeyProvider call() throws Exception {
-          return DFSUtilClient.createKeyProvider(conf);
+          return KMSUtil.createKeyProviderFromUri(conf, serverProviderUri);
         }
         }
       });
       });
     } catch (Exception e) {
     } catch (Exception e) {
-      LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause());
+      LOG.error("Could not create KeyProvider for DFSClient !!", e);
       return null;
       return null;
     }
     }
   }
   }
 
 
   private URI createKeyProviderURI(Configuration conf) {
   private URI createKeyProviderURI(Configuration conf) {
     final String providerUriStr = conf.getTrimmed(
     final String providerUriStr = conf.getTrimmed(
-        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, "");
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
     // No provider set in conf
     // No provider set in conf
-    if (providerUriStr.isEmpty()) {
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
       LOG.error("Could not find uri with key ["
       LOG.error("Could not find uri with key ["
           + CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH
           + CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH
           + "] to create a keyProvider !!");
           + "] to create a keyProvider !!");

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -1618,7 +1618,8 @@ public class PBHelperClient {
         fs.getFileBufferSize(),
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
         fs.getTrashInterval(),
-        convert(fs.getChecksumType()));
+        convert(fs.getChecksumType()),
+        fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null);
   }
   }
 
 
   public static List<CryptoProtocolVersionProto> convert(
   public static List<CryptoProtocolVersionProto> convert(
@@ -1769,6 +1770,7 @@ public class PBHelperClient {
         .setEncryptDataTransfer(fs.getEncryptDataTransfer())
         .setEncryptDataTransfer(fs.getEncryptDataTransfer())
         .setTrashInterval(fs.getTrashInterval())
         .setTrashInterval(fs.getTrashInterval())
         .setChecksumType(convert(fs.getChecksumType()))
         .setChecksumType(convert(fs.getChecksumType()))
+        .setKeyProviderUri(fs.getKeyProviderUri())
         .build();
         .build();
   }
   }
 
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -377,6 +377,7 @@ message FsServerDefaultsProto {
   optional bool encryptDataTransfer = 6 [default = false];
   optional bool encryptDataTransfer = 6 [default = false];
   optional uint64 trashInterval = 7 [default = 0];
   optional uint64 trashInterval = 7 [default = 0];
   optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
   optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
+  optional string keyProviderUri = 9;
 }
 }
 
 
 
 

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -153,6 +153,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -764,8 +765,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
           conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
           conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
           conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
           conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
           conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
-          checksumType);
-      
+          checksumType,
+          conf.getTrimmed(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+          ""));
+
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
 
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md

@@ -97,6 +97,7 @@ Once a KMS has been set up and the NameNode and HDFS clients have been correctly
 #### hadoop.security.key.provider.path
 #### hadoop.security.key.provider.path
 
 
 The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
 The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
+HDFS clients will use the provider path returned from Namenode via getServerDefaults. If namenode doesn't support returning key provider uri then client's conf will be used.
 
 
 ### <a name="Selecting_an_encryption_algorithm_and_codec"></a>Selecting an encryption algorithm and codec
 ### <a name="Selecting_an_encryption_algorithm_and_codec"></a>Selecting an encryption algorithm and codec
 
 

+ 205 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -85,6 +86,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.DelegationTokenExtension;
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.DelegationTokenExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
@@ -107,8 +109,21 @@ import static org.mockito.Matchers.anyShort;
 import static org.mockito.Mockito.withSettings;
 import static org.mockito.Mockito.withSettings;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.anyString;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
 import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -1688,4 +1703,194 @@ public class TestEncryptionZones {
       }
       }
     }
     }
   }
   }
+
+  /** This test tests that client will first lookup secrets map
+   * for key provider uri from {@link Credentials} in
+   * {@link UserGroupInformation}
+   * @throws Exception
+   */
+  @Test
+  public void testProviderUriInCredentials() throws Exception {
+    String dummyKeyProvider = "dummy://foo:bar@test_provider1";
+    DFSClient client = cluster.getFileSystem().getClient();
+    Credentials credentials = new Credentials();
+    // Key provider uri should be in the secret map of credentials object with
+    // namenode uri as key
+    Text lookUpKey = client.getKeyProviderMapKey();
+    credentials.addSecretKey(lookUpKey,
+        DFSUtilClient.string2Bytes(dummyKeyProvider));
+    client.ugi.addCredentials(credentials);
+    client.setKeyProviderUri(null);
+    Assert.assertEquals("Client Key provider is different from provider in "
+        + "credentials map", dummyKeyProvider,
+        client.getKeyProviderUri().toString());
+  }
+
+
+ /**
+  * Testing the fallback behavior of keyProviderUri.
+  * This test tests first the key provider uri is used from conf
+  * and then used from serverDefaults.
+  * @throws IOException
+  */
+  @Test
+  public void testKeyProviderFallBackBehavior() throws IOException {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    String dummyKeyProviderUri1 = "dummy://foo:bar@test_provider1";
+    // set the key provider uri in conf.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        dummyKeyProviderUri1);
+    DFSClient mockClient = Mockito.spy(cluster.getFileSystem().getClient());
+    mockClient.setKeyProviderUri(null);
+    // Namenode returning null as keyProviderUri in FSServerDefaults.
+    FsServerDefaults serverDefaultsWithKeyProviderNull =
+        getTestServerDefaults(null);
+    Mockito.doReturn(serverDefaultsWithKeyProviderNull)
+        .when(mockClient).getServerDefaults();
+    Assert.assertEquals(
+        "Key provider uri from client doesn't match with uri from conf",
+        dummyKeyProviderUri1, mockClient.getKeyProviderUri().toString());
+    Mockito.verify(mockClient, Mockito.times(1)).getServerDefaults();
+
+    String dummyKeyProviderUri2 = "dummy://foo:bar@test_provider2";
+    mockClient.setKeyProviderUri(null);
+    FsServerDefaults serverDefaultsWithDummyKeyProvider =
+        getTestServerDefaults(dummyKeyProviderUri2);
+    // Namenode returning dummyKeyProvider2 in serverDefaults.
+    Mockito.doReturn(serverDefaultsWithDummyKeyProvider)
+    .when(mockClient).getServerDefaults();
+    Assert.assertEquals(
+        "Key provider uri from client doesn't match with uri from namenode",
+        dummyKeyProviderUri2, mockClient.getKeyProviderUri().toString());
+    Mockito.verify(mockClient, Mockito.times(2)).getServerDefaults();
+  }
+
+  /**
+   * This test makes sure the client gets the key provider uri from namenode
+   * instead of its own conf.
+   * This test assumes both the namenode and client are upgraded.
+   * @throws Exception
+   */
+  @Test
+  public void testDifferentKMSProviderOnUpgradedNamenode() throws Exception {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    URI namenodeKeyProviderUri = URI.create(getKeyProviderURI());
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+
+    // Unset the provider path in conf
+    clusterConf.unset(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    // Nullify the cached value for key provider uri on client
+    cluster.getFileSystem().getClient().setKeyProviderUri(null);
+    // Even after unsetting the local conf, the client key provider should be
+    // the same as namenode's provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+
+    // Set the provider path to some dummy scheme.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        "dummy://foo:bar@test_provider1");
+    // Nullify the cached value for key provider uri on client
+    cluster.getFileSystem().getClient().setKeyProviderUri(null);
+    // Even after pointing the conf to some dummy provider, the client key
+    // provider should be the same as namenode's provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+  }
+
+  /**
+   * This test makes sure the client trusts its local conf
+   * This test assumes the client is upgraded but the namenode is not.
+   * @throws Exception
+   */
+  @Test
+  public void testDifferentKMSProviderOnUnUpgradedNamenode()
+      throws Exception {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    URI namenodeKeyProviderUri = URI.create(getKeyProviderURI());
+    URI clientKeyProviderUri =
+        cluster.getFileSystem().getClient().getKeyProviderUri();
+    Assert.assertNotNull(clientKeyProviderUri);
+    // Since the client and the namenode share the same conf, they will have
+    // identical key provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, clientKeyProviderUri);
+
+    String dummyKeyProviderUri = "dummy://foo:bar@test_provider";
+    // Unset the provider path in conf.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        dummyKeyProviderUri);
+    FsServerDefaults spyServerDefaults = getTestServerDefaults(null);
+    // Creating a fake serverdefaults so that we can simulate namenode not
+    // being upgraded.
+    DFSClient spyClient = Mockito.spy(cluster.getFileSystem().getClient());
+    // Clear the cache value of keyProviderUri on client side.
+    spyClient.setKeyProviderUri(null);
+    Mockito.doReturn(spyServerDefaults).when(spyClient).getServerDefaults();
+
+    // Since FsServerDefaults#keyProviderUri is null, the client
+    // will fallback to local conf which is null.
+    clientKeyProviderUri = spyClient.getKeyProviderUri();
+    Assert.assertEquals("Client keyProvider should be " + dummyKeyProviderUri,
+        dummyKeyProviderUri, clientKeyProviderUri.toString());
+    Mockito.verify(spyClient, Mockito.times(1)).getServerDefaults();
+  }
+
+  // Given a provider uri return serverdefaults.
+  // provider uri == null means the namenode does not support returning
+  // provider uri in FSServerDefaults object.
+  private FsServerDefaults getTestServerDefaults(String providerPath) {
+    FsServerDefaults serverDefaults = new FsServerDefaults(
+        conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
+        conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
+        conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
+        (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
+        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+        conf.getBoolean(
+        DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
+        conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
+        DataChecksum.Type.valueOf(DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT),
+        providerPath);
+    return serverDefaults;
+  }
+
+  /**
+   * This test performs encrypted read/write and picks up the key provider uri
+   * from the credentials and not the conf.
+   * @throws Exception
+   */
+  @Test
+  public void testEncryptedReadWriteUsingDiffKeyProvider() throws Exception {
+    final HdfsAdmin dfsAdmin =
+        new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    Configuration clusterConf = cluster.getConfiguration(0);
+    clusterConf.unset(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    DFSClient client = cluster.getFileSystem().getClient();
+    Credentials credentials = new Credentials();
+    Text lookUpKey = client.getKeyProviderMapKey();
+    credentials.addSecretKey(lookUpKey,
+        DFSUtilClient.string2Bytes(getKeyProviderURI()));
+    client.ugi.addCredentials(credentials);
+    // Create a base file for comparison
+    final Path baseFile = new Path("/base");
+    final int len = 8192;
+    DFSTestUtil.createFile(fs, baseFile, len, (short) 1, 0xFEED);
+    // Create the first enc file
+    final Path zone = new Path("/zone");
+    fs.mkdirs(zone);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    final Path encFile1 = new Path(zone, "myfile");
+    DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
+    // Read them back in and compare byte-by-byte
+    verifyFilesEqual(fs, baseFile, encFile1, len);
+  }
 }
 }

+ 17 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java

@@ -96,29 +96,42 @@ public class TestKeyProviderCache {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://foo:bar@test_provider1");
         "dummy://foo:bar@test_provider1");
-    KeyProvider keyProvider1 = kpCache.get(conf);
+    KeyProvider keyProvider1 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
     Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1);
     Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1);
 
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://foo:bar@test_provider1");
         "dummy://foo:bar@test_provider1");
-    KeyProvider keyProvider2 = kpCache.get(conf);
+    KeyProvider keyProvider2 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
 
     Assert.assertTrue("Different KeyProviders returned !!",
     Assert.assertTrue("Different KeyProviders returned !!",
         keyProvider1 == keyProvider2);
         keyProvider1 == keyProvider2);
 
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://test_provider3");
         "dummy://test_provider3");
-    KeyProvider keyProvider3 = kpCache.get(conf);
+    KeyProvider keyProvider3 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
 
     Assert.assertFalse("Same KeyProviders returned !!",
     Assert.assertFalse("Same KeyProviders returned !!",
         keyProvider1 == keyProvider3);
         keyProvider1 == keyProvider3);
 
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://hello:there@test_provider1");
         "dummy://hello:there@test_provider1");
-    KeyProvider keyProvider4 = kpCache.get(conf);
+    KeyProvider keyProvider4 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
 
     Assert.assertFalse("Same KeyProviders returned !!",
     Assert.assertFalse("Same KeyProviders returned !!",
         keyProvider1 == keyProvider4);
         keyProvider1 == keyProvider4);
 
 
   }
   }
+
+  private URI getKeyProviderUriFromConf(Configuration conf) {
+    String providerUriStr = conf.get(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
+      return null;
+    }
+    return URI.create(providerUriStr);
+  }
 }
 }

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -396,6 +399,34 @@ public class TestPBHelper {
     assertEquals(cmd.toString(), cmd2.toString());
     assertEquals(cmd.toString(), cmd2.toString());
   }
   }
 
 
+  /**
+   * Test case for old namenode where the namenode doesn't support returning
+   * keyProviderUri.
+   */
+  @Test
+  public void testFSServerDefaultsHelper() {
+    HdfsProtos.FsServerDefaultsProto.Builder b =
+        HdfsProtos.FsServerDefaultsProto
+        .newBuilder();
+    b.setBlockSize(DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+    b.setBytesPerChecksum(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+    b.setWritePacketSize(
+        HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    b.setReplication(DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    b.setFileBufferSize(DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    b.setEncryptDataTransfer(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    b.setTrashInterval(DFSConfigKeys.FS_TRASH_INTERVAL_DEFAULT);
+    b.setChecksumType(HdfsProtos.ChecksumTypeProto.valueOf(
+        DataChecksum.Type.valueOf(DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT).id));
+    HdfsProtos.FsServerDefaultsProto proto = b.build();
+
+    Assert.assertFalse("KeyProvider uri is not supported",
+        proto.hasKeyProviderUri());
+    FsServerDefaults fsServerDefaults = PBHelperClient.convert(proto);
+    Assert.assertNotNull("FsServerDefaults is null", fsServerDefaults);
+    Assert.assertNull("KeyProviderUri should be null",
+        fsServerDefaults.getKeyProviderUri());
+  }
 
 
   @Test
   @Test
   public void testConvertText() {
   public void testConvertText() {