Explorar o código

HADOOP-19306. Support user defined auth Callback in SaslRpcServer. (#7140)

Tsz-Wo Nicholas Sze hai 5 meses
pai
achega
317db31a9a
Modificáronse 15 ficheiros con 291 adicións e 112 borrados
  1. 6 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  2. 6 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  3. 64 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/CustomizedCallbackHandler.java
  4. 0 45
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslConstants.java
  5. 75 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslMechanismFactory.java
  6. 39 20
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
  7. 25 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/package-info.java
  8. 17 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  9. 1 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
  10. 0 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
  11. 3 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
  12. 4 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
  13. 22 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/package-info.java
  14. 0 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  15. 29 10
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestCustomizedCallbackHandler.java

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -736,6 +736,12 @@ public class CommonConfigurationKeysPublic {
    */
   public static final String  HADOOP_RPC_PROTECTION =
     "hadoop.rpc.protection";
+  public static final String HADOOP_SECURITY_SASL_MECHANISM_KEY
+      = "hadoop.security.sasl.mechanism";
+  public static final String HADOOP_SECURITY_SASL_MECHANISM_DEFAULT
+      = "DIGEST-MD5";
+  public static final String HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY
+      = "hadoop.security.sasl.CustomizedCallbackHandler.class";
   /** Class to override Sasl Properties for a connection */
   public static final String  HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS =
     "hadoop.security.saslproperties.resolver.class";

+ 6 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -106,7 +106,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RPCTraceInfoProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.SaslConstants;
+import org.apache.hadoop.security.SaslMechanismFactory;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -2143,6 +2143,10 @@ public abstract class Server {
       return Server.this;
     }
 
+    public Configuration getConf() {
+      return Server.this.getConf();
+    }
+
     /* Return true if the connection has no outstanding rpc */
     private boolean isIdle() {
       return rpcCount.get() == 0;
@@ -2606,7 +2610,7 @@ public abstract class Server {
       // accelerate token negotiation by sending initial challenge
       // in the negotiation response
       if (enabledAuthMethods.contains(AuthMethod.TOKEN)
-          && SaslConstants.SASL_MECHANISM_DEFAULT.equals(AuthMethod.TOKEN.getMechanismName())) {
+          && SaslMechanismFactory.isDefaultMechanism(AuthMethod.TOKEN.getMechanismName())) {
         saslServer = createSaslServer(AuthMethod.TOKEN);
         byte[] challenge = saslServer.evaluateResponse(new byte[0]);
         RpcSaslProto.Builder negotiateBuilder =

+ 64 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/CustomizedCallbackHandler.java → hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/CustomizedCallbackHandler.java

@@ -15,23 +15,80 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+package org.apache.hadoop.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /** For handling customized {@link Callback}. */
 public interface CustomizedCallbackHandler {
-  class DefaultHandler implements CustomizedCallbackHandler{
+  Logger LOG = LoggerFactory.getLogger(CustomizedCallbackHandler.class);
+
+  class Cache {
+    private static final Map<String, CustomizedCallbackHandler> MAP = new HashMap<>();
+
+    private static synchronized CustomizedCallbackHandler getSynchronously(
+        String key, Configuration conf) {
+      //check again synchronously
+      final CustomizedCallbackHandler cached = MAP.get(key);
+      if (cached != null) {
+        return cached; //cache hit
+      }
+
+      //cache miss
+      final Class<?> clazz = conf.getClass(key, DefaultHandler.class);
+      LOG.info("{} = {}", key, clazz);
+      if (clazz == DefaultHandler.class) {
+        return DefaultHandler.INSTANCE;
+      }
+
+      final Object created;
+      try {
+        created = clazz.newInstance();
+      } catch (Exception e) {
+        LOG.warn("Failed to create a new instance of {}, fallback to {}",
+            clazz, DefaultHandler.class, e);
+        return DefaultHandler.INSTANCE;
+      }
+
+      final CustomizedCallbackHandler handler = created instanceof CustomizedCallbackHandler ?
+          (CustomizedCallbackHandler) created : CustomizedCallbackHandler.delegate(created);
+      MAP.put(key, handler);
+      return handler;
+    }
+
+    private static CustomizedCallbackHandler get(String key, Configuration conf) {
+      final CustomizedCallbackHandler cached = MAP.get(key);
+      return cached != null ? cached : getSynchronously(key, conf);
+    }
+
+    public static synchronized void clear() {
+      MAP.clear();
+    }
+
+    private Cache() { }
+  }
+
+  class DefaultHandler implements CustomizedCallbackHandler {
+    private static final DefaultHandler INSTANCE = new DefaultHandler();
+
     @Override
     public void handleCallbacks(List<Callback> callbacks, String username, char[] password)
         throws UnsupportedCallbackException {
       if (!callbacks.isEmpty()) {
-        throw new UnsupportedCallbackException(callbacks.get(0));
+        final Callback cb = callbacks.get(0);
+        throw new UnsupportedCallbackException(callbacks.get(0),
+            "Unsupported callback: " + (cb == null ? null : cb.getClass()));
       }
     }
   }
@@ -55,6 +112,10 @@ public interface CustomizedCallbackHandler {
     };
   }
 
+  static CustomizedCallbackHandler get(String key, Configuration conf) {
+    return Cache.get(key, conf);
+  }
+
   void handleCallbacks(List<Callback> callbacks, String name, char[] password)
       throws UnsupportedCallbackException, IOException;
 }

+ 0 - 45
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslConstants.java

@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SASL related constants.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
-@InterfaceStability.Evolving
-public class SaslConstants {
-  public static final Logger LOG = LoggerFactory.getLogger(SaslConstants.class);
-
-  private static final String SASL_MECHANISM_ENV = "HADOOP_SASL_MECHANISM";
-  public static final String SASL_MECHANISM;
-  public static final String SASL_MECHANISM_DEFAULT = "DIGEST-MD5";
-
-  static {
-    final String mechanism = System.getenv(SASL_MECHANISM_ENV);
-    LOG.debug("{} = {} (env)", SASL_MECHANISM_ENV, mechanism);
-    SASL_MECHANISM = mechanism != null? mechanism : SASL_MECHANISM_DEFAULT;
-    LOG.debug("{} = {} (effective)", SASL_MECHANISM_ENV, SASL_MECHANISM);
-  }
-
-  private SaslConstants() {}
-}

+ 75 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslMechanismFactory.java

@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_MECHANISM_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_MECHANISM_KEY;
+
+/**
+ * SASL related constants.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "YARN", "HBase"})
+@InterfaceStability.Evolving
+public final class SaslMechanismFactory {
+  static final Logger LOG = LoggerFactory.getLogger(SaslMechanismFactory.class);
+
+  private static final String SASL_MECHANISM_ENV = "HADOOP_SASL_MECHANISM";
+  private static final String SASL_MECHANISM;
+
+  static {
+    // env
+    final String envValue = System.getenv(SASL_MECHANISM_ENV);
+    LOG.debug("{} = {} (env)", SASL_MECHANISM_ENV, envValue);
+
+    // conf
+    final Configuration conf = new Configuration(false);
+    final String confValue = conf.get(HADOOP_SECURITY_SASL_MECHANISM_KEY,
+        HADOOP_SECURITY_SASL_MECHANISM_DEFAULT);
+    LOG.debug("{} = {} (conf)", HADOOP_SECURITY_SASL_MECHANISM_KEY, confValue);
+
+    if (envValue != null && confValue != null) {
+      if (!envValue.equals(confValue)) {
+        throw new HadoopIllegalArgumentException("SASL Mechanism mismatched: env "
+            + SASL_MECHANISM_ENV + " is " + envValue + " but conf "
+            + HADOOP_SECURITY_SASL_MECHANISM_KEY + " is " + confValue);
+      }
+    }
+
+    SASL_MECHANISM = envValue != null ? envValue
+        : confValue != null ? confValue
+        : HADOOP_SECURITY_SASL_MECHANISM_DEFAULT;
+    LOG.debug("SASL_MECHANISM = {} (effective)", SASL_MECHANISM);
+  }
+
+  public static String getMechanism() {
+    return SASL_MECHANISM;
+  }
+
+  public static boolean isDefaultMechanism(String mechanism) {
+    return HADOOP_SECURITY_SASL_MECHANISM_DEFAULT.equals(mechanism);
+  }
+
+  private SaslMechanismFactory() {}
+}

+ 39 - 20
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java

@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.security.Security;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
@@ -43,16 +45,16 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY;
+
 /**
  * A utility class for dealing with SASL on RPC server
  */
@@ -223,8 +225,8 @@ public class SaslRpcServer {
     SIMPLE((byte) 80, ""),
     KERBEROS((byte) 81, "GSSAPI"),
     @Deprecated
-    DIGEST((byte) 82, SaslConstants.SASL_MECHANISM),
-    TOKEN((byte) 82, SaslConstants.SASL_MECHANISM),
+    DIGEST((byte) 82, SaslMechanismFactory.getMechanism()),
+    TOKEN((byte) 82, SaslMechanismFactory.getMechanism()),
     PLAIN((byte) 83, "PLAIN");
 
     /** The code for this method. */
@@ -234,6 +236,8 @@ public class SaslRpcServer {
     private AuthMethod(byte code, String mechanismName) { 
       this.code = code;
       this.mechanismName = mechanismName;
+      LOG.info("{} {}: code={}, mechanism=\"{}\"",
+          getClass().getSimpleName(), name(), code, mechanismName);
     }
 
     private static final int FIRST_CODE = values()[0].code;
@@ -276,28 +280,44 @@ public class SaslRpcServer {
   /** CallbackHandler for SASL mechanism. */
   @InterfaceStability.Evolving
   public static class SaslDigestCallbackHandler implements CallbackHandler {
+    private final CustomizedCallbackHandler customizedCallbackHandler;
     private SecretManager<TokenIdentifier> secretManager;
     private Server.Connection connection; 
     
     public SaslDigestCallbackHandler(
         SecretManager<TokenIdentifier> secretManager,
         Server.Connection connection) {
+      this(secretManager, connection, connection.getConf());
+    }
+
+    public SaslDigestCallbackHandler(
+        SecretManager<TokenIdentifier> secretManager,
+        Server.Connection connection,
+        Configuration conf) {
       this.secretManager = secretManager;
       this.connection = connection;
+      this.customizedCallbackHandler = CustomizedCallbackHandler.get(
+          HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY, conf);
     }
 
-    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken,
-        StandbyException, RetriableException, IOException {
+    private char[] getPassword(TokenIdentifier tokenid) throws IOException {
       return encodePassword(secretManager.retriableRetrievePassword(tokenid));
     }
 
+    private char[] getPassword(String name) throws IOException {
+      final TokenIdentifier tokenIdentifier = getIdentifier(name, secretManager);
+      final UserGroupInformation user = tokenIdentifier.getUser();
+      connection.attemptingUser = user;
+      LOG.debug("SASL server callback: setting password for client: {}", user);
+      return getPassword(tokenIdentifier);
+    }
+
     @Override
-    public void handle(Callback[] callbacks) throws InvalidToken,
-        UnsupportedCallbackException, StandbyException, RetriableException,
-        IOException {
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
       NameCallback nc = null;
       PasswordCallback pc = null;
       AuthorizeCallback ac = null;
+      List<Callback> unknownCallbacks = null;
       for (Callback callback : callbacks) {
         if (callback instanceof AuthorizeCallback) {
           ac = (AuthorizeCallback) callback;
@@ -308,20 +328,14 @@ public class SaslRpcServer {
         } else if (callback instanceof RealmCallback) {
           continue; // realm is ignored
         } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL Callback");
+          if (unknownCallbacks == null) {
+            unknownCallbacks = new ArrayList<>();
+          }
+          unknownCallbacks.add(callback);
         }
       }
       if (pc != null) {
-        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(),
-            secretManager);
-        char[] password = getPassword(tokenIdentifier);
-        UserGroupInformation user = null;
-        user = tokenIdentifier.getUser(); // may throw exception
-        connection.attemptingUser = user;
-
-        LOG.debug("SASL server callback: setting password for client: {}", user);
-        pc.setPassword(password);
+        pc.setPassword(getPassword(nc.getDefaultName()));
       }
       if (ac != null) {
         String authid = ac.getAuthenticationID();
@@ -341,6 +355,11 @@ public class SaslRpcServer {
           ac.setAuthorizedID(authzid);
         }
       }
+      if (unknownCallbacks != null) {
+        final String name = nc != null ? nc.getDefaultName() : null;
+        final char[] password = name != null ? getPassword(name) : null;
+        customizedCallbackHandler.handleCallbacks(unknownCallbacks, name, password);
+      }
     }
   }
 

+ 25 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/package-info.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes for hadoop security.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "YARN", "HBase"})
+package org.apache.hadoop.security;
+
+import org.apache.hadoop.classification.InterfaceAudience;

+ 17 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -732,6 +732,23 @@
   </description>
 </property>
 
+<property>
+  <name>hadoop.security.sasl.mechanism</name>
+  <value>DIGEST-MD5</value>
+  <description>
+    The SASL mechanism used in Hadoop.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.security.sasl.CustomizedCallbackHandler.class</name>
+  <value></value>
+  <description>
+    Some security provider may define a new javax.security.auth.callback.Callback.
+    This property allows users to configure a customized callback handler.
+  </description>
+</property>
+
 <property>
   <name>hadoop.security.sensitive-config-keys</name>
   <value>

+ 1 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -536,7 +536,7 @@ public class TestSaslRPC extends TestRpcBase {
   private static Pattern BadToken =
       Pattern.compile("^" + RemoteException.class.getName() +
           "\\("+ SaslException.class.getName() + "\\): " +
-          SaslConstants.SASL_MECHANISM + ": digest response format violation.*");
+          SaslMechanismFactory.getMechanism() + ": digest response format violation.*");
   private static Pattern KrbFailed =
       Pattern.compile(".*Failed on local exception:.* " +
                       "Failed to specify server's Kerberos principal name.*");

+ 0 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

@@ -236,9 +236,6 @@ public interface HdfsClientConfigKeys {
   String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
       "dfs.data.transfer.saslproperties.resolver.class";
 
-  String DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY
-      = "dfs.data.transfer.sasl.CustomizedCallbackHandler.class";
-
   String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
       "dfs.encrypt.data.transfer.cipher.key.bitlength";
   int    DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.security.FastSaslClientFactory;
 import org.apache.hadoop.security.FastSaslServerFactory;
-import org.apache.hadoop.security.SaslConstants;
+import org.apache.hadoop.security.SaslMechanismFactory;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslOutputStream;
 
@@ -52,7 +52,7 @@ class SaslParticipant {
   // a short string.
   private static final String SERVER_NAME = "0";
   private static final String PROTOCOL = "hdfs";
-  private static final String[] MECHANISM_ARRAY = {SaslConstants.SASL_MECHANISM};
+  private static final String[] MECHANISM_ARRAY = {SaslMechanismFactory.getMechanism()};
   private static final byte[] EMPTY_BYTE_ARRAY = {};
 
   // One of these will always be null.
@@ -127,7 +127,7 @@ class SaslParticipant {
   }
 
   byte[] createFirstMessage() throws SaslException {
-    return MECHANISM_ARRAY[0].equals(SaslConstants.SASL_MECHANISM_DEFAULT) ? EMPTY_BYTE_ARRAY
+    return SaslMechanismFactory.isDefaultMechanism(MECHANISM_ARRAY[0]) ? EMPTY_BYTE_ARRAY
         : evaluateChallengeOrResponse(EMPTY_BYTE_ARRAY);
   }
 

+ 4 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
@@ -47,7 +48,6 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherOption;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.security.CustomizedCallbackHandler;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -224,21 +225,8 @@ public class SaslDataTransferServer {
      */
     SaslServerCallbackHandler(Configuration conf, PasswordFunction passwordFunction) {
       this.passwordFunction = passwordFunction;
-
-      final Class<?> clazz = conf.getClass(
-          HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
-          CustomizedCallbackHandler.DefaultHandler.class);
-      final Object callbackHandler;
-      try {
-        callbackHandler = clazz.newInstance();
-      } catch (Exception e) {
-        throw new IllegalStateException("Failed to create a new instance of " + clazz, e);
-      }
-      if (callbackHandler instanceof CustomizedCallbackHandler) {
-        customizedCallbackHandler = (CustomizedCallbackHandler) callbackHandler;
-      } else {
-        customizedCallbackHandler = CustomizedCallbackHandler.delegate(callbackHandler);
-      }
+      this.customizedCallbackHandler = CustomizedCallbackHandler.get(
+          HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY, conf);
     }
 
     @Override

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes for data transfer SASL implementation.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;

+ 0 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2641,15 +2641,6 @@
   </description>
 </property>
 
-<property>
-  <name>dfs.data.transfer.sasl.CustomizedCallbackHandler.class</name>
-  <value></value>
-  <description>
-    Some security provider may define a new javax.security.auth.callback.Callback.
-    This property allows users to configure a customized callback handler.
-  </description>
-</property>
-
 <property>
   <name>dfs.journalnode.rpc-address</name>
   <value>0.0.0.0:8485</value>

+ 29 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestCustomizedCallbackHandler.java

@@ -18,8 +18,9 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer.SaslServerCallbackHandler;
+import org.apache.hadoop.security.CustomizedCallbackHandler;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -32,12 +33,19 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY;
+
 /** For testing {@link CustomizedCallbackHandler}. */
 public class TestCustomizedCallbackHandler {
   static final Logger LOG = LoggerFactory.getLogger(TestCustomizedCallbackHandler.class);
 
   static final AtomicReference<List<Callback>> LAST_CALLBACKS = new AtomicReference<>();
 
+  static void reset() {
+    LAST_CALLBACKS.set(null);
+    CustomizedCallbackHandler.Cache.clear();
+  }
+
   static void runHandleCallbacks(Object caller, List<Callback> callbacks, String name) {
     LOG.info("{}: handling {} for {}", caller.getClass().getSimpleName(), callbacks, name);
     LAST_CALLBACKS.set(callbacks);
@@ -53,9 +61,9 @@ public class TestCustomizedCallbackHandler {
     }
   }
 
-  static class MyCallback implements Callback { }
+  public static class MyCallback implements Callback { }
 
-  static class MyCallbackHandler implements CustomizedCallbackHandler {
+  public static class MyCallbackHandler implements CustomizedCallbackHandler {
     @Override
     public void handleCallbacks(List<Callback> callbacks, String name, char[] password) {
       runHandleCallbacks(this, callbacks, name);
@@ -68,23 +76,31 @@ public class TestCustomizedCallbackHandler {
     final Callback[] callbacks = {new MyCallback()};
 
     // without setting conf, expect UnsupportedCallbackException
+    reset();
     LambdaTestUtils.intercept(UnsupportedCallbackException.class, () -> runTest(conf, callbacks));
 
     // set conf and expect success
-    conf.setClass(HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
+    reset();
+    conf.setClass(HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
         MyCallbackHandler.class, CustomizedCallbackHandler.class);
-    new SaslServerCallbackHandler(conf, String::toCharArray).handle(callbacks);
+    runTest(conf, callbacks);
+    assertCallbacks(callbacks);
+
+    reset();
+    conf.setClass(HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
+        MyCallbackHandler.class, CustomizedCallbackHandler.class);
+    new SaslRpcServer.SaslDigestCallbackHandler(null, null, conf).handle(callbacks);
     assertCallbacks(callbacks);
   }
 
-  static class MyCallbackMethod {
+  public static class MyCallbackMethod {
     public void handleCallbacks(List<Callback> callbacks, String name, char[] password)
         throws UnsupportedCallbackException {
       runHandleCallbacks(this, callbacks, name);
     }
   }
 
-  static class MyExceptionMethod {
+  public static class MyExceptionMethod {
     public void handleCallbacks(List<Callback> callbacks, String name, char[] password)
         throws UnsupportedCallbackException {
       runHandleCallbacks(this, callbacks, name);
@@ -98,16 +114,19 @@ public class TestCustomizedCallbackHandler {
     final Callback[] callbacks = {new MyCallback()};
 
     // without setting conf, expect UnsupportedCallbackException
+    reset();
     LambdaTestUtils.intercept(UnsupportedCallbackException.class, () -> runTest(conf, callbacks));
 
     // set conf and expect success
-    conf.setClass(HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
+    reset();
+    conf.setClass(HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
         MyCallbackMethod.class, Object.class);
-    new SaslServerCallbackHandler(conf, String::toCharArray).handle(callbacks);
+    runTest(conf, callbacks);
     assertCallbacks(callbacks);
 
     // set conf and expect exception
-    conf.setClass(HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
+    reset();
+    conf.setClass(HADOOP_SECURITY_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
         MyExceptionMethod.class, Object.class);
     LambdaTestUtils.intercept(IOException.class, () -> runTest(conf, callbacks));
   }