Browse Source

HADOOP-14104. Client should always ask namenode for kms provider path. Contributed by Rushabh S Shah.

Andrew Wang 8 years ago
parent
commit
18432130a7
16 changed files with 386 additions and 40 deletions
  1. 2 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  2. 12 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java
  3. 3 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
  4. 4 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
  5. 9 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
  6. 2 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  7. 70 8
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  8. 11 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  9. 9 8
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
  10. 3 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  11. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
  12. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md
  14. 205 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
  15. 17 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java
  16. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -874,7 +874,8 @@ public abstract class FileSystem extends Configured implements Closeable {
         config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
         false,
         FS_TRASH_INTERVAL_DEFAULT,
-        DataChecksum.Type.CRC32);
+        DataChecksum.Type.CRC32,
+        "");
   }
 
   /**

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -54,6 +54,7 @@ public class FsServerDefaults implements Writable {
   private boolean encryptDataTransfer;
   private long trashInterval;
   private DataChecksum.Type checksumType;
+  private String keyProviderUri;
 
   public FsServerDefaults() {
   }
@@ -61,7 +62,8 @@ public class FsServerDefaults implements Writable {
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
       int writePacketSize, short replication, int fileBufferSize,
       boolean encryptDataTransfer, long trashInterval,
-      DataChecksum.Type checksumType) {
+      DataChecksum.Type checksumType,
+      String keyProviderUri) {
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
@@ -70,6 +72,7 @@ public class FsServerDefaults implements Writable {
     this.encryptDataTransfer = encryptDataTransfer;
     this.trashInterval = trashInterval;
     this.checksumType = checksumType;
+    this.keyProviderUri = keyProviderUri;
   }
 
   public long getBlockSize() {
@@ -104,6 +107,14 @@ public class FsServerDefaults implements Writable {
     return checksumType;
   }
 
+  /* null means old style namenode.
+   * "" (empty string) means namenode is upgraded but EZ is not supported.
+   * some string means that value is the key provider.
+   */
+  public String getKeyProviderUri() {
+    return keyProviderUri;
+  }
+
   // /////////////////////////////////////////
   // Writable
   // /////////////////////////////////////////

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java

@@ -54,6 +54,7 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
   public static final long    FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
       DataChecksum.Type.CRC32;
+  public static final String KEY_PROVIDER_URI_DEFAULT = "";
   
   protected static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
@@ -64,7 +65,8 @@ public class FtpConfigKeys extends CommonConfigurationKeys {
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
-        CHECKSUM_TYPE_DEFAULT);
+        CHECKSUM_TYPE_DEFAULT,
+        KEY_PROVIDER_URI_DEFAULT);
   }
 }
   

+ 4 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java

@@ -54,6 +54,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
   public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
   public static final DataChecksum.Type CHECKSUM_TYPE_DEFAULT =
       DataChecksum.Type.CRC32;
+  public static final String KEY_PROVIDER_URI_DEFAULT = "";
+
   public static FsServerDefaults getServerDefaults() throws IOException {
     return new FsServerDefaults(
         BLOCK_SIZE_DEFAULT,
@@ -63,7 +65,8 @@ public class LocalConfigKeys extends CommonConfigurationKeys {
         STREAM_BUFFER_SIZE_DEFAULT,
         ENCRYPT_DATA_TRANSFER_DEFAULT,
         FS_TRASH_INTERVAL_DEFAULT,
-        CHECKSUM_TYPE_DEFAULT);
+        CHECKSUM_TYPE_DEFAULT,
+        KEY_PROVIDER_URI_DEFAULT);
   }
 }
   

+ 9 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java

@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 /**
  * Utils for KMS.
@@ -51,21 +50,20 @@ public final class KMSUtil {
   public static KeyProvider createKeyProvider(final Configuration conf,
       final String configKeyName) throws IOException {
     LOG.debug("Creating key provider with config key {}", configKeyName);
-    final String providerUriStr = conf.getTrimmed(configKeyName, "");
+    final String providerUriStr = conf.getTrimmed(configKeyName);
     // No provider set in conf
-    if (providerUriStr.isEmpty()) {
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
       return null;
     }
-    final URI providerUri;
-    try {
-      providerUri = new URI(providerUriStr);
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
+    return createKeyProviderFromUri(conf, URI.create(providerUriStr));
+  }
+
+  public static KeyProvider createKeyProviderFromUri(final Configuration conf,
+      final URI providerUri) throws IOException {
     KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
     if (keyProvider == null) {
-      throw new IOException("Could not instantiate KeyProvider from " +
-          configKeyName + " setting of '" + providerUriStr + "'");
+      throw new IOException("Could not instantiate KeyProvider for uri: " +
+          providerUri);
     }
     if (keyProvider.isTransient()) {
       throw new IOException("KeyProvider " + keyProvider.toString()

+ 2 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2202,6 +2202,8 @@
   <description>
     The KeyProvider to use when managing zone keys, and interacting with
     encryption keys when reading and writing to an encryption zone.
+    For hdfs clients, the provider path will be same as namenode's
+    provider path.
   </description>
 </property>
 

+ 70 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -160,6 +161,7 @@ import org.apache.hadoop.ipc.RpcNoSuchMethodException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
@@ -197,6 +199,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
   // 1 hour
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
+  private static final String DFS_KMS_PREFIX = "dfs-kms-";
 
   private final Configuration conf;
   private final Tracer tracer;
@@ -214,7 +217,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   private final FileSystem.Statistics stats;
-  private final String authority;
+  private final URI namenodeUri;
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
@@ -228,6 +231,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int smallBufferSize;
+  private URI keyProviderUri = null;
 
   public DfsClientConf getConf() {
     return dfsClientConf;
@@ -298,7 +302,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
     this.ugi = UserGroupInformation.getCurrentUser();
 
-    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
+    this.namenodeUri = nameNodeUri;
     this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
         ThreadLocalRandom.current().nextInt()  + "_" +
         Thread.currentThread().getId();
@@ -454,7 +458,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    *  be returned until all output streams are closed.
    */
   public LeaseRenewer getLeaseRenewer() {
-    return LeaseRenewer.getInstance(authority, ugi, this);
+    return LeaseRenewer.getInstance(
+        namenodeUri != null ? namenodeUri.getAuthority() : "null", ugi, this);
   }
 
   /** Get a lease and start automatic renewal */
@@ -2851,8 +2856,66 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return HEDGED_READ_METRIC;
   }
 
-  public KeyProvider getKeyProvider() {
-    return clientContext.getKeyProviderCache().get(conf);
+  /**
+   * Returns a key to map namenode uri to key provider uri.
+   * Tasks will lookup this key to find key Provider.
+   */
+  public Text getKeyProviderMapKey() {
+    return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
+        +"://" + namenodeUri.getAuthority());
+  }
+
+  /**
+   * The key provider uri is searched in the following order.
+   * 1. If there is a mapping in Credential's secrets map for namenode uri.
+   * 2. From namenode getServerDefaults rpc.
+   * 3. Finally fallback to local conf.
+   * @return keyProviderUri if found from either of above 3 cases,
+   * null otherwise
+   * @throws IOException
+   */
+  URI getKeyProviderUri() throws IOException {
+    if (keyProviderUri != null) {
+      return keyProviderUri;
+    }
+
+    // Lookup the secret in credentials object for namenodeuri.
+    Credentials credentials = ugi.getCredentials();
+    byte[] keyProviderUriBytes = credentials.getSecretKey(getKeyProviderMapKey());
+    if(keyProviderUriBytes != null) {
+      keyProviderUri =
+          URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
+      return keyProviderUri;
+    }
+
+    // Query the namenode for the key provider uri.
+    FsServerDefaults serverDefaults = getServerDefaults();
+    if (serverDefaults.getKeyProviderUri() != null) {
+      if (!serverDefaults.getKeyProviderUri().isEmpty()) {
+        keyProviderUri = URI.create(serverDefaults.getKeyProviderUri());
+      }
+      return keyProviderUri;
+    }
+
+    // Last thing is to trust its own conf to be backwards compatible.
+    String keyProviderUriStr = conf.getTrimmed(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    if (keyProviderUriStr != null && !keyProviderUriStr.isEmpty()) {
+      keyProviderUri = URI.create(keyProviderUriStr);
+    }
+    return keyProviderUri;
+  }
+
+  public KeyProvider getKeyProvider() throws IOException {
+    return clientContext.getKeyProviderCache().get(conf, getKeyProviderUri());
+  }
+
+  /*
+   * Should be used only for testing.
+   */
+  @VisibleForTesting
+  public void setKeyProviderUri(URI providerUri) {
+    this.keyProviderUri = providerUri;
   }
 
   @VisibleForTesting
@@ -2862,11 +2925,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   /**
    * Probe for encryption enabled on this filesystem.
-   * See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)}
    * @return true if encryption is enabled
    */
-  public boolean isHDFSEncryptionEnabled() {
-    return DFSUtilClient.isHDFSEncryptionEnabled(this.conf);
+  public boolean isHDFSEncryptionEnabled() throws IOException{
+    return getKeyProviderUri() != null;
   }
 
   /**

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -2409,12 +2409,15 @@ public class DistributedFileSystem extends FileSystem {
   public Token<?>[] addDelegationTokens(
       final String renewer, Credentials credentials) throws IOException {
     Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
-    if (dfs.isHDFSEncryptionEnabled()) {
+    URI keyProviderUri = dfs.getKeyProviderUri();
+    if (keyProviderUri != null) {
       KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
           KeyProviderDelegationTokenExtension.
               createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
       Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
           addDelegationTokens(renewer, credentials);
+      credentials.addSecretKey(dfs.getKeyProviderMapKey(),
+          DFSUtilClient.string2Bytes(keyProviderUri.toString()));
       if (tokens != null && kpTokens != null) {
         Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
         System.arraycopy(tokens, 0, all, 0, tokens.length);
@@ -2551,7 +2554,13 @@ public class DistributedFileSystem extends FileSystem {
    */
   @Override
   public Path getTrashRoot(Path path) {
-    if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
+    try {
+      if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
+        return super.getTrashRoot(path);
+      }
+    } catch (IOException ioe) {
+      DFSClient.LOG.warn("Exception while checking whether encryption zone is "
+          + "supported", ioe);
       return super.getTrashRoot(path);
     }
 

+ 9 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.util.KMSUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
@@ -66,29 +67,29 @@ public class KeyProviderCache {
         .build();
   }
 
-  public KeyProvider get(final Configuration conf) {
-    URI kpURI = createKeyProviderURI(conf);
-    if (kpURI == null) {
+  public KeyProvider get(final Configuration conf,
+      final URI serverProviderUri) {
+    if (serverProviderUri == null) {
       return null;
     }
     try {
-      return cache.get(kpURI, new Callable<KeyProvider>() {
+      return cache.get(serverProviderUri, new Callable<KeyProvider>() {
         @Override
         public KeyProvider call() throws Exception {
-          return DFSUtilClient.createKeyProvider(conf);
+          return KMSUtil.createKeyProviderFromUri(conf, serverProviderUri);
         }
       });
     } catch (Exception e) {
-      LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause());
+      LOG.error("Could not create KeyProvider for DFSClient !!", e);
       return null;
     }
   }
 
   private URI createKeyProviderURI(Configuration conf) {
     final String providerUriStr = conf.getTrimmed(
-        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, "");
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
     // No provider set in conf
-    if (providerUriStr.isEmpty()) {
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
       LOG.error("Could not find uri with key ["
           + CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH
           + "] to create a keyProvider !!");

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -1758,7 +1758,8 @@ public class PBHelperClient {
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
-        convert(fs.getChecksumType()));
+        convert(fs.getChecksumType()),
+        fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null);
   }
 
   public static List<CryptoProtocolVersionProto> convert(
@@ -1932,6 +1933,7 @@ public class PBHelperClient {
         .setEncryptDataTransfer(fs.getEncryptDataTransfer())
         .setTrashInterval(fs.getTrashInterval())
         .setChecksumType(convert(fs.getChecksumType()))
+        .setKeyProviderUri(fs.getKeyProviderUri())
         .build();
   }
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -420,6 +420,7 @@ message FsServerDefaultsProto {
   optional bool encryptDataTransfer = 6 [default = false];
   optional uint64 trashInterval = 7 [default = 0];
   optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
+  optional string keyProviderUri = 9;
 }
 
 

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -148,6 +148,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -778,8 +779,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
           conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
           conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
-          checksumType);
-      
+          checksumType,
+          conf.getTrimmed(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+          ""));
+
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/TransparentEncryption.md

@@ -97,6 +97,7 @@ Once a KMS has been set up and the NameNode and HDFS clients have been correctly
 #### hadoop.security.key.provider.path
 
 The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
+HDFS clients will use the provider path returned from Namenode via getServerDefaults. If namenode doesn't support returning key provider uri then client's conf will be used.
 
 ### <a name="Selecting_an_encryption_algorithm_and_codec"></a>Selecting an encryption algorithm and codec
 

+ 205 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -83,6 +84,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.DelegationTokenExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
@@ -105,8 +107,21 @@ import static org.mockito.Matchers.anyShort;
 import static org.mockito.Mockito.withSettings;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -1668,4 +1683,194 @@ public class TestEncryptionZones {
       }
     }
   }
+
+  /** This test tests that client will first lookup secrets map
+   * for key provider uri from {@link Credentials} in
+   * {@link UserGroupInformation}
+   * @throws Exception
+   */
+  @Test
+  public void testProviderUriInCredentials() throws Exception {
+    String dummyKeyProvider = "dummy://foo:bar@test_provider1";
+    DFSClient client = cluster.getFileSystem().getClient();
+    Credentials credentials = new Credentials();
+    // Key provider uri should be in the secret map of credentials object with
+    // namenode uri as key
+    Text lookUpKey = client.getKeyProviderMapKey();
+    credentials.addSecretKey(lookUpKey,
+        DFSUtilClient.string2Bytes(dummyKeyProvider));
+    client.ugi.addCredentials(credentials);
+    client.setKeyProviderUri(null);
+    Assert.assertEquals("Client Key provider is different from provider in "
+        + "credentials map", dummyKeyProvider,
+        client.getKeyProviderUri().toString());
+  }
+
+
+ /**
+  * Testing the fallback behavior of keyProviderUri.
+  * This test tests first the key provider uri is used from conf
+  * and then used from serverDefaults.
+  * @throws IOException
+  */
+  @Test
+  public void testKeyProviderFallBackBehavior() throws IOException {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    String dummyKeyProviderUri1 = "dummy://foo:bar@test_provider1";
+    // set the key provider uri in conf.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        dummyKeyProviderUri1);
+    DFSClient mockClient = Mockito.spy(cluster.getFileSystem().getClient());
+    mockClient.setKeyProviderUri(null);
+    // Namenode returning null as keyProviderUri in FSServerDefaults.
+    FsServerDefaults serverDefaultsWithKeyProviderNull =
+        getTestServerDefaults(null);
+    Mockito.doReturn(serverDefaultsWithKeyProviderNull)
+        .when(mockClient).getServerDefaults();
+    Assert.assertEquals(
+        "Key provider uri from client doesn't match with uri from conf",
+        dummyKeyProviderUri1, mockClient.getKeyProviderUri().toString());
+    Mockito.verify(mockClient, Mockito.times(1)).getServerDefaults();
+
+    String dummyKeyProviderUri2 = "dummy://foo:bar@test_provider2";
+    mockClient.setKeyProviderUri(null);
+    FsServerDefaults serverDefaultsWithDummyKeyProvider =
+        getTestServerDefaults(dummyKeyProviderUri2);
+    // Namenode returning dummyKeyProvider2 in serverDefaults.
+    Mockito.doReturn(serverDefaultsWithDummyKeyProvider)
+    .when(mockClient).getServerDefaults();
+    Assert.assertEquals(
+        "Key provider uri from client doesn't match with uri from namenode",
+        dummyKeyProviderUri2, mockClient.getKeyProviderUri().toString());
+    Mockito.verify(mockClient, Mockito.times(2)).getServerDefaults();
+  }
+
+  /**
+   * This test makes sure the client gets the key provider uri from namenode
+   * instead of its own conf.
+   * This test assumes both the namenode and client are upgraded.
+   * @throws Exception
+   */
+  @Test
+  public void testDifferentKMSProviderOnUpgradedNamenode() throws Exception {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    URI namenodeKeyProviderUri = URI.create(getKeyProviderURI());
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+
+    // Unset the provider path in conf
+    clusterConf.unset(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    // Nullify the cached value for key provider uri on client
+    cluster.getFileSystem().getClient().setKeyProviderUri(null);
+    // Even after unsetting the local conf, the client key provider should be
+    // the same as namenode's provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+
+    // Set the provider path to some dummy scheme.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        "dummy://foo:bar@test_provider1");
+    // Nullify the cached value for key provider uri on client
+    cluster.getFileSystem().getClient().setKeyProviderUri(null);
+    // Even after pointing the conf to some dummy provider, the client key
+    // provider should be the same as namenode's provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, cluster.getFileSystem().getClient()
+        .getKeyProviderUri());
+  }
+
+  /**
+   * This test makes sure the client trusts its local conf
+   * This test assumes the client is upgraded but the namenode is not.
+   * @throws Exception
+   */
+  @Test
+  public void testDifferentKMSProviderOnUnUpgradedNamenode()
+      throws Exception {
+    Configuration clusterConf = cluster.getConfiguration(0);
+    URI namenodeKeyProviderUri = URI.create(getKeyProviderURI());
+    URI clientKeyProviderUri =
+        cluster.getFileSystem().getClient().getKeyProviderUri();
+    Assert.assertNotNull(clientKeyProviderUri);
+    // Since the client and the namenode share the same conf, they will have
+    // identical key provider.
+    Assert.assertEquals("Key Provider for client and namenode are different",
+        namenodeKeyProviderUri, clientKeyProviderUri);
+
+    String dummyKeyProviderUri = "dummy://foo:bar@test_provider";
+    // Unset the provider path in conf.
+    clusterConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        dummyKeyProviderUri);
+    FsServerDefaults spyServerDefaults = getTestServerDefaults(null);
+    // Creating a fake serverdefaults so that we can simulate namenode not
+    // being upgraded.
+    DFSClient spyClient = Mockito.spy(cluster.getFileSystem().getClient());
+    // Clear the cache value of keyProviderUri on client side.
+    spyClient.setKeyProviderUri(null);
+    Mockito.doReturn(spyServerDefaults).when(spyClient).getServerDefaults();
+
+    // Since FsServerDefaults#keyProviderUri is null, the client
+    // will fallback to local conf which is null.
+    clientKeyProviderUri = spyClient.getKeyProviderUri();
+    Assert.assertEquals("Client keyProvider should be " + dummyKeyProviderUri,
+        dummyKeyProviderUri, clientKeyProviderUri.toString());
+    Mockito.verify(spyClient, Mockito.times(1)).getServerDefaults();
+  }
+
+  // Given a provider uri return serverdefaults.
+  // provider uri == null means the namenode does not support returning
+  // provider uri in FSServerDefaults object.
+  private FsServerDefaults getTestServerDefaults(String providerPath) {
+    FsServerDefaults serverDefaults = new FsServerDefaults(
+        conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
+        conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
+        conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
+        (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
+        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+        conf.getBoolean(
+        DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
+        conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
+        DataChecksum.Type.valueOf(DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT),
+        providerPath);
+    return serverDefaults;
+  }
+
+  /**
+   * This test performs encrypted read/write and picks up the key provider uri
+   * from the credentials and not the conf.
+   * @throws Exception
+   */
+  @Test
+  public void testEncryptedReadWriteUsingDiffKeyProvider() throws Exception {
+    final HdfsAdmin dfsAdmin =
+        new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    Configuration clusterConf = cluster.getConfiguration(0);
+    clusterConf.unset(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    DFSClient client = cluster.getFileSystem().getClient();
+    Credentials credentials = new Credentials();
+    Text lookUpKey = client.getKeyProviderMapKey();
+    credentials.addSecretKey(lookUpKey,
+        DFSUtilClient.string2Bytes(getKeyProviderURI()));
+    client.ugi.addCredentials(credentials);
+    // Create a base file for comparison
+    final Path baseFile = new Path("/base");
+    final int len = 8192;
+    DFSTestUtil.createFile(fs, baseFile, len, (short) 1, 0xFEED);
+    // Create the first enc file
+    final Path zone = new Path("/zone");
+    fs.mkdirs(zone);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
+    final Path encFile1 = new Path(zone, "myfile");
+    DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
+    // Read them back in and compare byte-by-byte
+    verifyFilesEqual(fs, baseFile, encFile1, len);
+  }
 }

+ 17 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestKeyProviderCache.java

@@ -96,29 +96,42 @@ public class TestKeyProviderCache {
     Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://foo:bar@test_provider1");
-    KeyProvider keyProvider1 = kpCache.get(conf);
+    KeyProvider keyProvider1 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
     Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1);
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://foo:bar@test_provider1");
-    KeyProvider keyProvider2 = kpCache.get(conf);
+    KeyProvider keyProvider2 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
     Assert.assertTrue("Different KeyProviders returned !!",
         keyProvider1 == keyProvider2);
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://test_provider3");
-    KeyProvider keyProvider3 = kpCache.get(conf);
+    KeyProvider keyProvider3 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
     Assert.assertFalse("Same KeyProviders returned !!",
         keyProvider1 == keyProvider3);
 
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
         "dummy://hello:there@test_provider1");
-    KeyProvider keyProvider4 = kpCache.get(conf);
+    KeyProvider keyProvider4 = kpCache.get(conf,
+        getKeyProviderUriFromConf(conf));
 
     Assert.assertFalse("Same KeyProviders returned !!",
         keyProvider1 == keyProvider4);
 
   }
+
+  private URI getKeyProviderUriFromConf(Configuration conf) {
+    String providerUriStr = conf.get(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    if (providerUriStr == null || providerUriStr.isEmpty()) {
+      return null;
+    }
+    return URI.create(providerUriStr);
+  }
 }

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -35,9 +35,12 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -868,5 +871,33 @@ public class TestPBHelper {
     }
   }
 
+  /**
+   * Test case for old namenode where the namenode doesn't support returning
+   * keyProviderUri.
+   */
+  @Test
+  public void testFSServerDefaultsHelper() {
+    HdfsProtos.FsServerDefaultsProto.Builder b =
+        HdfsProtos.FsServerDefaultsProto
+        .newBuilder();
+    b.setBlockSize(DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+    b.setBytesPerChecksum(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+    b.setWritePacketSize(
+        HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    b.setReplication(DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    b.setFileBufferSize(DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    b.setEncryptDataTransfer(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    b.setTrashInterval(DFSConfigKeys.FS_TRASH_INTERVAL_DEFAULT);
+    b.setChecksumType(HdfsProtos.ChecksumTypeProto.valueOf(
+        DataChecksum.Type.valueOf(DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT).id));
+    HdfsProtos.FsServerDefaultsProto proto = b.build();
+
+    Assert.assertFalse("KeyProvider uri is not supported",
+        proto.hasKeyProviderUri());
+    FsServerDefaults fsServerDefaults = PBHelperClient.convert(proto);
+    Assert.assertNotNull("FsServerDefaults is null", fsServerDefaults);
+    Assert.assertNull("KeyProviderUri should be null",
+        fsServerDefaults.getKeyProviderUri());
+  }
 
 }