Browse Source

HADOOP-14324. Refine S3 server-side-encryption key as encryption secret; improve error reporting and diagnostics. Contributed by Steve Loughran

Mingliang Liu 8 years ago
parent
commit
667966c13c

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -815,6 +815,7 @@ public class CommonConfigurationKeysPublic {
           "password$",
           "ssl.keystore.pass$",
           "fs.s3.*[Ss]ecret.?[Kk]ey",
+          "fs.s3a.*.server-side-encryption.key",
           "fs.azure\\.account.key.*",
           "credential$",
           "oauth.*token$",

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -522,6 +522,7 @@
       password$
       ssl.keystore.pass$
       fs.s3.*[Ss]ecret.?[Kk]ey
+      fs.s3a.*.server-side-encryption.key
       fs.azure.account.key.*
       credential$
       oauth.*token$
@@ -1160,7 +1161,18 @@
 <property>
   <name>fs.s3a.server-side-encryption-algorithm</name>
   <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default, and the only other currently allowable value is AES256.
+    Unset by default.  It supports the following values: 'AES256' (for SSE-S3),
+    'SSE-KMS' and 'SSE-C'.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.server-side-encryption.key</name>
+  <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
+    has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
+    should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
+    you'll be using your default's S3 KMS key, otherwise you should set this property to
+    the specific KMS key id.
   </description>
 </property>
 

+ 1 - 0
hadoop-common-project/hadoop-common/src/site/markdown/DeprecatedProperties.md

@@ -62,6 +62,7 @@ The following table lists the configuration property names that are deprecated i
 | fs.checkpoint.edits.dir | dfs.namenode.checkpoint.edits.dir |
 | fs.checkpoint.period | dfs.namenode.checkpoint.period |
 | fs.default.name | fs.defaultFS |
+| fs.s3a.server-side-encryption-key | fs.s3a.server-side-encryption.key |
 | hadoop.configured.node.mapping | net.topology.configured.node.mapping |
 | hadoop.native.lib | io.native.lib.available |
 | hadoop.net.static.resolutions | mapreduce.tasktracker.net.static.resolutions |

+ 4 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigRedactor.java

@@ -52,6 +52,8 @@ public class TestConfigRedactor {
     List<String> sensitiveKeys = Arrays.asList(
         "fs.s3a.secret.key",
         "fs.s3a.bucket.BUCKET.secret.key",
+        "fs.s3a.server-side-encryption.key",
+        "fs.s3a.bucket.engineering.server-side-encryption.key",
         "fs.s3n.awsSecretKey",
         "fs.azure.account.key.abcdefg.blob.core.windows.net",
         "fs.adl.oauth2.refresh.token",
@@ -77,7 +79,8 @@ public class TestConfigRedactor {
         "dfs.replication",
         "ssl.server.keystore.location",
         "httpfs.config.dir",
-        "hadoop.security.credstore.java-keystore-provider.password-file"
+        "hadoop.security.credstore.java-keystore-provider.password-file",
+        "fs.s3a.bucket.engineering.server-side-encryption-algorithm"
     );
     for (String key : normalKeys) {
       processedText = redactor.redact(key, ORIGINAL_VALUE);

+ 16 - 12
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -225,18 +225,30 @@ public final class Constants {
    * Different implementations may support others (or none).
    * Use the S3AEncryptionMethods instead when configuring
    * which Server Side Encryption to use.
+   * Value: "{@value}".
    */
   @Deprecated
   public static final String SERVER_SIDE_ENCRYPTION_AES256 =
       "AES256";
 
   /**
-   *  Used to specify which AWS KMS key to use if
-   *  SERVER_SIDE_ENCRYPTION_ALGORITHM is AWS_KMS (will default to aws/s3
-   *  master key if left blank) or with SSE_C, the actual AES 256 key.
+   * Used to specify which AWS KMS key to use if
+   * {@link #SERVER_SIDE_ENCRYPTION_ALGORITHM} is
+   * {@code SSE-KMS} (will default to aws/s3
+   * master key if left blank).
+   * With with {@code SSE_C}, the base-64 encoded AES 256 key.
+   * May be set within a JCEKS file.
+   * Value: "{@value}".
    */
   public static final String SERVER_SIDE_ENCRYPTION_KEY =
-      "fs.s3a.server-side-encryption-key";
+      "fs.s3a.server-side-encryption.key";
+
+  /**
+   * The original key name. Never used in ASF releases,
+   * but did get into downstream products.
+   */
+  static final String OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY
+      = "fs.s3a.server-side-encryption-key";
 
   //override signature algorithm used for signing requests
   public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
@@ -308,12 +320,4 @@ public final class Constants {
   @InterfaceAudience.Private
   public static final int MAX_MULTIPART_COUNT = 10000;
 
-  @InterfaceAudience.Private
-  public static final String SSE_C_NO_KEY_ERROR = S3AEncryptionMethods.SSE_C
-      .getMethod() +" is enabled and no encryption key is provided.";
-
-
-  @InterfaceAudience.Private
-  public static final String SSE_S3_WITH_KEY_ERROR = S3AEncryptionMethods.SSE_S3
-      .getMethod() +" is configured and an " + "encryption key is provided";
 }

+ 12 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java

@@ -33,6 +33,9 @@ public enum S3AEncryptionMethods {
   SSE_C("SSE-C"),
   NONE("");
 
+  static final String UNKNOWN_ALGORITHM
+      = "Unknown Server Side Encryption algorithm ";
+
   private String method;
 
   S3AEncryptionMethods(String method) {
@@ -43,6 +46,13 @@ public enum S3AEncryptionMethods {
     return method;
   }
 
+
+  /**
+   * Get the encryption mechanism from the value provided.
+   * @param name algorithm name
+   * @return the method
+   * @throws IOException if the algorithm is unknown
+   */
   public static S3AEncryptionMethods getMethod(String name) throws IOException {
     if(StringUtils.isBlank(name)) {
       return NONE;
@@ -55,7 +65,8 @@ public enum S3AEncryptionMethods {
     case "SSE-C":
       return SSE_C;
     default:
-      throw new IOException("Unknown Server Side algorithm "+name);
+      throw new IOException(UNKNOWN_ALGORITHM + name);
     }
   }
+
 }

+ 18 - 12
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -156,6 +156,23 @@ public class S3AFileSystem extends FileSystem {
   private S3ADataBlocks.BlockFactory blockFactory;
   private int blockOutputActiveBlocks;
 
+  /** Add any deprecated keys. */
+  @SuppressWarnings("deprecation")
+  private static void addDeprecatedKeys() {
+    Configuration.addDeprecations(
+        new Configuration.DeprecationDelta[]{
+            // never shipped in an ASF release, but did get into the wild.
+            new Configuration.DeprecationDelta(
+                OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY,
+                SERVER_SIDE_ENCRYPTION_KEY)
+        });
+    Configuration.reloadExistingConfigurations();
+  }
+
+  static {
+    addDeprecatedKeys();
+  }
+
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.
    *   for this FileSystem
@@ -240,18 +257,7 @@ public class S3AFileSystem extends FileSystem {
 
       initMultipartUploads(conf);
 
-      serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod(
-          conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
-      if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
-          StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) {
-        throw new IOException(Constants.SSE_C_NO_KEY_ERROR);
-      }
-      if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) &&
-          StringUtils.isNotBlank(getServerSideEncryptionKey(
-            getConf()))) {
-        throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR);
-      }
-      LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
+      serverSideEncryptionAlgorithm = getEncryptionAlgorithm(conf);
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
 

+ 116 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -81,6 +81,22 @@ public final class S3AUtils {
   static final String CREDENTIAL_PROVIDER_PATH =
       "hadoop.security.credential.provider.path";
 
+  /**
+   * Encryption SSE-C used but the config lacks an encryption key.
+   */
+  public static final String SSE_C_NO_KEY_ERROR =
+      S3AEncryptionMethods.SSE_C.getMethod()
+          + " is enabled but no encryption key was declared in "
+          + SERVER_SIDE_ENCRYPTION_KEY;
+  /**
+   * Encryption SSE-S3 is used but the caller also set an encryption key.
+   */
+  public static final String SSE_S3_WITH_KEY_ERROR =
+      S3AEncryptionMethods.SSE_S3.getMethod()
+          + " is enabled but an encryption key was set in "
+          + SERVER_SIDE_ENCRYPTION_KEY;
+
+
   private S3AUtils() {
   }
 
@@ -449,8 +465,27 @@ public final class S3AUtils {
    */
   static String getPassword(Configuration conf, String key, String val)
       throws IOException {
+    String defVal = "";
+    return getPassword(conf, key, val, defVal);
+  }
+
+  /**
+   * Get a password from a configuration, or, if a value is passed in,
+   * pick that up instead.
+   * @param conf configuration
+   * @param key key to look up
+   * @param val current value: if non empty this is used instead of
+   * querying the configuration.
+   * @param defVal default value if nothing is set
+   * @return a password or "".
+   * @throws IOException on any problem
+   */
+  private static String getPassword(Configuration conf,
+      String key,
+      String val,
+      String defVal) throws IOException {
     return StringUtils.isEmpty(val)
-        ? lookupPassword(conf, key, "")
+        ? lookupPassword(conf, key, defVal)
         : val;
   }
 
@@ -708,14 +743,91 @@ public final class S3AUtils {
     }
   }
 
+  /**
+   * Get any SSE key from a configuration/credential provider.
+   * This operation handles the case where the option has been
+   * set in the provider or configuration to the option
+   * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
+   * @param conf configuration to examine
+   * @return the encryption key or null
+   */
   static String getServerSideEncryptionKey(Configuration conf) {
     try {
-      return getPassword(conf, Constants.SERVER_SIDE_ENCRYPTION_KEY,
-        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY));
+      return lookupPassword(conf, SERVER_SIDE_ENCRYPTION_KEY,
+          getPassword(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY,
+              null, null));
     } catch (IOException e) {
       LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e);
+      return "";
+    }
+  }
+
+  /**
+   * Get the server-side encryption algorithm.
+   * This includes validation of the configuration, checking the state of
+   * the encryption key given the chosen algorithm.
+   * @param conf configuration to scan
+   * @return the encryption mechanism (which will be {@code NONE} unless
+   * one is set.
+   * @throws IOException on any validation problem.
+   */
+  static S3AEncryptionMethods getEncryptionAlgorithm(Configuration conf)
+      throws IOException {
+    S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
+        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
+    String sseKey = getServerSideEncryptionKey(conf);
+    int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
+    String diagnostics = passwordDiagnostics(sseKey, "key");
+    switch (sse) {
+    case SSE_C:
+      if (sseKeyLen == 0) {
+        throw new IOException(SSE_C_NO_KEY_ERROR);
+      }
+      break;
+
+    case SSE_S3:
+      if (sseKeyLen != 0) {
+        throw new IOException(SSE_S3_WITH_KEY_ERROR
+            + " (" + diagnostics + ")");
+      }
+      break;
+
+    case SSE_KMS:
+      LOG.debug("Using SSE-KMS with {}",
+          diagnostics);
+      break;
+
+    case NONE:
+    default:
+      LOG.debug("Data is unencrypted");
+      break;
+    }
+    LOG.debug("Using SSE-C with {}", diagnostics);
+    return sse;
+  }
+
+  /**
+   * Provide a password diagnostics string.
+   * This aims to help diagnostics without revealing significant password details
+   * @param pass password
+   * @param description description for text, e.g "key" or "password"
+   * @return text for use in messages.
+   */
+  private static String passwordDiagnostics(String pass, String description) {
+    if (pass == null) {
+      return "null " + description;
+    }
+    int len = pass.length();
+    switch (len) {
+    case 0:
+      return "empty " + description;
+    case 1:
+      return description + " of length 1";
+
+    default:
+      return description + " of length " + len + " ending with "
+          + pass.charAt(len - 1);
     }
-    return null;
   }
 
   /**

+ 1 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -836,7 +836,7 @@ from placing its declaration on the command line.
     </property>
 
     <property>
-        <name>fs.s3a.server-side-encryption-key</name>
+        <name>fs.s3a.server-side-encryption.key</name>
         <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
         has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
         should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,

+ 2 - 2
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md

@@ -263,7 +263,7 @@ source code tree, it is not going to get accidentally committed.
 ### Configuring S3a Encryption
 
 For S3a encryption tests to run correctly, the
-`fs.s3a.server-side-encryption-key` must be configured in the s3a contract xml
+`fs.s3a.server-side-encryption.key` must be configured in the s3a contract xml
 file with a AWS KMS encryption key arn as this value is different for each AWS
 KMS.
 
@@ -271,7 +271,7 @@ Example:
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption-key</name>
+  <name>fs.s3a.server-side-encryption.key</name>
   <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
 </property>
 ```

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java

@@ -91,7 +91,7 @@ public class ITestS3AEncryptionAlgorithmValidation
   @Test
   public void testEncryptionAlgorithmSSECWithBlankEncryptionKey() throws
     Throwable {
-    intercept(IOException.class, Constants.SSE_C_NO_KEY_ERROR, () -> {
+    intercept(IOException.class, S3AUtils.SSE_C_NO_KEY_ERROR, () -> {
 
         Configuration conf = super.createConfiguration();
         //SSE-C must be configured with an encryption key
@@ -117,7 +117,7 @@ public class ITestS3AEncryptionAlgorithmValidation
     Throwable {
     //skip tests if they aren't enabled
     assumeEnabled();
-    intercept(IOException.class, Constants.SSE_S3_WITH_KEY_ERROR, () -> {
+    intercept(IOException.class, S3AUtils.SSE_S3_WITH_KEY_ERROR, () -> {
 
         Configuration conf = super.createConfiguration();
         //SSE-S3 cannot be configured with an encryption key

+ 223 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java

@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test SSE setup operations and errors raised.
+ */
+public class TestSSEConfiguration extends Assert {
+
+  @Rule
+  public Timeout testTimeout = new Timeout(
+      S3ATestConstants.S3A_TEST_TIMEOUT
+  );
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Test
+  public void testSSECNoKey() throws Throwable {
+    assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
+  }
+
+  @Test
+  public void testSSECBlankKey() throws Throwable {
+    assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
+  }
+
+  @Test
+  public void testSSECGoodKey() throws Throwable {
+    assertEquals(SSE_C, getAlgorithm(SSE_C, "sseckey"));
+  }
+
+  @Test
+  public void testKMSGoodKey() throws Throwable {
+    assertEquals(SSE_KMS, getAlgorithm(SSE_KMS, "kmskey"));
+  }
+
+  @Test
+  public void testKMSGoodOldOptionName() throws Throwable {
+    Configuration conf = new Configuration(false);
+    conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, SSE_KMS.getMethod());
+    conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "kmskeyID");
+    // verify key round trip
+    assertEquals("kmskeyID", getServerSideEncryptionKey(conf));
+    // and that KMS lookup finds it
+    assertEquals(SSE_KMS, getEncryptionAlgorithm(conf));
+  }
+
+  @Test
+  public void testAESKeySet() throws Throwable {
+    assertExceptionTextEquals(SSE_S3_WITH_KEY_ERROR,
+        SSE_S3.getMethod(), "setkey");
+  }
+
+  @Test
+  public void testSSEEmptyKey() throws Throwable {
+    // test the internal logic of the test setup code
+    Configuration c = buildConf(SSE_C.getMethod(), "");
+    assertEquals("", getServerSideEncryptionKey(c));
+  }
+
+  @Test
+  public void testSSEKeyNull() throws Throwable {
+    // test the internal logic of the test setup code
+    final Configuration c = buildConf(SSE_C.getMethod(), null);
+    assertNull("", getServerSideEncryptionKey(c));
+
+    intercept(IOException.class, SSE_C_NO_KEY_ERROR,
+        new Callable<S3AEncryptionMethods>() {
+          @Override
+          public S3AEncryptionMethods call() throws Exception {
+            return getEncryptionAlgorithm(c);
+          }
+        });
+  }
+
+  @Test
+  public void testSSEKeyFromCredentialProvider() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    addFileProvider(conf);
+    String key = "provisioned";
+    provisionSSEKey(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
+    // let's set the password in config and ensure that it uses the credential
+    // provider provisioned value instead.
+    conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
+
+    String sseKey = getServerSideEncryptionKey(conf);
+    assertNotNull("Proxy password should not retrun null.", sseKey);
+    assertEquals("Proxy password override did NOT work.", key, sseKey);
+  }
+
+  /**
+   * Very that the old key is picked up via the properties
+   * @throws Exception failure
+   */
+  @Test
+  public void testOldKeyFromCredentialProvider() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    addFileProvider(conf);
+    String key = "provisioned";
+    provisionSSEKey(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, key);
+    // let's set the password in config and ensure that it uses the credential
+    // provider provisioned value instead.
+    //conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "oldKeyInConf");
+    String sseKey = getServerSideEncryptionKey(conf);
+    assertNotNull("Proxy password should not retrun null.", sseKey);
+    assertEquals("Proxy password override did NOT work.", key, sseKey);
+  }
+
+  /**
+   * Add a temp file provider to the config.
+   * @param conf config
+   * @throws Exception failure
+   */
+  private void addFileProvider(Configuration conf)
+      throws Exception {
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+  }
+
+  /**
+   * Set the SSE Key via the provision API, not the config itself.
+   * @param conf config
+   * @param option option name
+   * @param key key to set
+   * @throws Exception failure
+   */
+  void provisionSSEKey(final Configuration conf,
+      String option, String key) throws Exception {
+    // add our password to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(option,
+        key.toCharArray());
+    provider.flush();
+  }
+
+  /**
+   * Assert that the exception text from a config contains the expected string
+   * @param expected expected substring
+   * @param alg algorithm to ask for
+   * @param key optional key value
+   * @throws Exception anything else which gets raised
+   */
+  public void assertExceptionTextEquals(String expected,
+      final String alg, final String key) throws Exception {
+    intercept(IOException.class, expected,
+        new Callable<S3AEncryptionMethods>() {
+          @Override
+          public S3AEncryptionMethods call() throws Exception {
+            return getAlgorithm(alg, key);
+          }
+        });
+  }
+
+  private S3AEncryptionMethods getAlgorithm(S3AEncryptionMethods algorithm,
+      String key)
+      throws IOException {
+    return getAlgorithm(algorithm.getMethod(), key);
+  }
+
+  private S3AEncryptionMethods getAlgorithm(String algorithm, String key)
+      throws IOException {
+    return getEncryptionAlgorithm(buildConf(algorithm, key));
+  }
+
+  private Configuration buildConf(String algorithm, String key) {
+    Configuration conf = new Configuration(false);
+    if (algorithm != null) {
+      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, algorithm);
+    } else {
+      conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    }
+    if (key != null) {
+      conf.set(SERVER_SIDE_ENCRYPTION_KEY, key);
+    } else {
+      conf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    }
+    return conf;
+  }
+}