浏览代码

HDDS-6. Enable SCM kerberos auth. Contributed by Ajay Kumar.

Xiaoyu Yao 7 年之前
父节点
当前提交
ff61931f91
共有 24 个文件被更改,包括 375 次插入48 次删除
  1. 1 1
      hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosUtil.java
  2. 2 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationFieldsBase.java
  3. 1 12
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
  4. 7 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  5. 3 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
  6. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
  7. 6 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java
  8. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolPB.java
  9. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  10. 24 2
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  11. 4 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  12. 6 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java
  13. 47 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  14. 2 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
  15. 2 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
  16. 3 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  17. 12 4
      hadoop-ozone/common/src/main/bin/start-ozone.sh
  18. 6 7
      hadoop-ozone/common/src/main/bin/stop-ozone.sh
  19. 6 0
      hadoop-ozone/integration-test/pom.xml
  20. 3 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
  21. 2 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  22. 15 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  23. 205 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
  24. 6 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

+ 1 - 1
hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosUtil.java

@@ -167,7 +167,7 @@ public class KerberosUtil {
   }
   }
 
 
   /* Return fqdn of the current host */
   /* Return fqdn of the current host */
-  static String getLocalHostName() throws UnknownHostException {
+  public static String getLocalHostName() throws UnknownHostException {
     return InetAddress.getLocalHost().getCanonicalHostName();
     return InetAddress.getLocalHost().getCanonicalHostName();
   }
   }
   
   

+ 2 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationFieldsBase.java

@@ -436,6 +436,8 @@ public abstract class TestConfigurationFieldsBase {
     // Create XML key/value map
     // Create XML key/value map
     LOG_XML.debug("Reading XML property files\n");
     LOG_XML.debug("Reading XML property files\n");
     xmlKeyValueMap = extractPropertiesFromXml(xmlFilename);
     xmlKeyValueMap = extractPropertiesFromXml(xmlFilename);
+    // Remove hadoop property set in ozone-default.xml
+    xmlKeyValueMap.remove("hadoop.custom.tags");
     LOG_XML.debug("\n=====\n");
     LOG_XML.debug("\n=====\n");
 
 
     // Create default configuration variable key/value map
     // Create default configuration variable key/value map

+ 1 - 12
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

@@ -275,18 +275,7 @@ public final class HddsUtils {
   }
   }
 
 
   public static boolean isHddsEnabled(Configuration conf) {
   public static boolean isHddsEnabled(Configuration conf) {
-    String securityEnabled =
-        conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-            "simple");
-    boolean securityAuthorizationEnabled = conf.getBoolean(
-        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false);
-
-    if (securityEnabled.equals("kerberos") || securityAuthorizationEnabled) {
-      LOG.error("Ozone is not supported in a security enabled cluster. ");
-      return false;
-    } else {
-      return conf.getBoolean(OZONE_ENABLED, OZONE_ENABLED_DEFAULT);
-    }
+    return conf.getBoolean(OZONE_ENABLED, OZONE_ENABLED_DEFAULT);
   }
   }
 
 
 
 

+ 7 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -205,8 +205,9 @@ public final class ScmConfigKeys {
       "ozone.scm.http-address";
       "ozone.scm.http-address";
   public static final String OZONE_SCM_HTTPS_ADDRESS_KEY =
   public static final String OZONE_SCM_HTTPS_ADDRESS_KEY =
       "ozone.scm.https-address";
       "ozone.scm.https-address";
-  public static final String OZONE_SCM_KEYTAB_FILE =
-      "ozone.scm.keytab.file";
+  public static final String OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY =
+      "ozone.scm.kerberos.keytab.file";
+  public static final String OZONE_SCM_KERBEROS_PRINCIPAL_KEY = "ozone.scm.kerberos.principal";
   public static final String OZONE_SCM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
   public static final String OZONE_SCM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
   public static final int OZONE_SCM_HTTP_BIND_PORT_DEFAULT = 9876;
   public static final int OZONE_SCM_HTTP_BIND_PORT_DEFAULT = 9876;
   public static final int OZONE_SCM_HTTPS_BIND_PORT_DEFAULT = 9877;
   public static final int OZONE_SCM_HTTPS_BIND_PORT_DEFAULT = 9877;
@@ -325,6 +326,10 @@ public final class ScmConfigKeys {
   public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
   public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
       "10m";
       "10m";
 
 
+  public static final String SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY =
+      "ozone.scm.web.authentication.kerberos.principal";
+  public static final String SCM_WEB_AUTHENTICATION_KERBEROS_KEYTAB_FILE_KEY =
+      "ozone.scm.web.authentication.kerberos.keytab";
   /**
   /**
    * Never constructed.
    * Never constructed.
    */
    */

+ 3 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java

@@ -17,6 +17,8 @@
  */
  */
 package org.apache.hadoop.hdds.scm.protocol;
 package org.apache.hadoop.hdds.scm.protocol;
 
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -31,6 +33,7 @@ import java.util.List;
  * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
  * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
  * to read/write a block.
  * to read/write a block.
  */
  */
+@KerberosInfo(serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 public interface ScmBlockLocationProtocol {
 public interface ScmBlockLocationProtocol {
 
 
   /**
   /**

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java

@@ -17,6 +17,8 @@
 
 
 package org.apache.hadoop.hdds.scm.protocol;
 package org.apache.hadoop.hdds.scm.protocol;
 
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -27,11 +29,13 @@ import org.apache.hadoop.hdds.protocol.proto
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * ContainerLocationProtocol is used by an HDFS node to find the set of nodes
  * ContainerLocationProtocol is used by an HDFS node to find the set of nodes
  * that currently host a container.
  * that currently host a container.
  */
  */
+@KerberosInfo(serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 public interface StorageContainerLocationProtocol {
 public interface StorageContainerLocationProtocol {
   /**
   /**
    * Asks SCM where a container should be allocated. SCM responds with the
    * Asks SCM where a container should be allocated. SCM responds with the

+ 6 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolPB.java

@@ -18,9 +18,13 @@
 package org.apache.hadoop.hdds.scm.protocolPB;
 package org.apache.hadoop.hdds.scm.protocolPB;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .ScmBlockLocationProtocolService;
     .ScmBlockLocationProtocolService;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  This extends the
  * Protocol used from an HDFS node to StorageContainerManager.  This extends the
@@ -30,6 +34,8 @@ import org.apache.hadoop.ipc.ProtocolInfo;
     "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol",
     "org.apache.hadoop.ozone.protocol.ScmBlockLocationProtocol",
     protocolVersion = 1)
     protocolVersion = 1)
 @InterfaceAudience.Private
 @InterfaceAudience.Private
+@KerberosInfo(
+    serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 public interface ScmBlockLocationProtocolPB
 public interface ScmBlockLocationProtocolPB
     extends ScmBlockLocationProtocolService.BlockingInterface {
     extends ScmBlockLocationProtocolService.BlockingInterface {
 }
 }

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolPB.java

@@ -21,7 +21,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos
     .StorageContainerLocationProtocolProtos
     .StorageContainerLocationProtocolService;
     .StorageContainerLocationProtocolService;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  This extends the
  * Protocol used from an HDFS node to StorageContainerManager.  This extends the
@@ -30,6 +32,8 @@ import org.apache.hadoop.ipc.ProtocolInfo;
 @ProtocolInfo(protocolName =
 @ProtocolInfo(protocolName =
     "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol",
     "org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol",
     protocolVersion = 1)
     protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public interface StorageContainerLocationProtocolPB
 public interface StorageContainerLocationProtocolPB
     extends StorageContainerLocationProtocolService.BlockingInterface {
     extends StorageContainerLocationProtocolService.BlockingInterface {

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -346,6 +346,10 @@ public final class OzoneConfigKeys {
   public static final double
   public static final double
       HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75;
       HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75;
 
 
+  public static final String OZONE_SECURITY_ENABLED_KEY =
+      "ozone.security.enabled";
+  public static final boolean OZONE_SECURITY_ENABLED_DEFAULT = false;
+
   public static final String OZONE_CONTAINER_COPY_WORKDIR =
   public static final String OZONE_CONTAINER_COPY_WORKDIR =
       "hdds.datanode.replication.work.dir";
       "hdds.datanode.replication.work.dir";
 
 

+ 24 - 2
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1308,7 +1308,6 @@
       datanode unless the datanode confirms the completion.
       datanode unless the datanode confirms the completion.
     </description>
     </description>
   </property>
   </property>
-
   <property>
   <property>
     <name>hdds.db.profile</name>
     <name>hdds.db.profile</name>
     <value>DISK</value>
     <value>DISK</value>
@@ -1317,7 +1316,6 @@
     that tunes the RocksDB settings for the hardware it is running
     that tunes the RocksDB settings for the hardware it is running
     on. Right now, we have SSD and DISK as profile options.</description>
     on. Right now, we have SSD and DISK as profile options.</description>
   </property>
   </property>
-
   <property>
   <property>
     <name>hdds.datanode.replication.work.dir</name>
     <name>hdds.datanode.replication.work.dir</name>
     <tag>DATANODE</tag>
     <tag>DATANODE</tag>
@@ -1585,5 +1583,29 @@
     <tag>OZONE, SECURITY, ACL</tag>
     <tag>OZONE, SECURITY, ACL</tag>
     <description>Key to enable/disable ozone acls.</description>
     <description>Key to enable/disable ozone acls.</description>
   </property>
   </property>
+  <property>
+    <name>ozone.scm.kerberos.keytab.file</name>
+    <value></value>
+    <tag> OZONE, SECURITY</tag>
+    <description> The keytab file used by each SCM daemon to login as its
+      service principal. The principal name is configured with
+      ozone.scm.kerberos.principal.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.kerberos.principal</name>
+    <value></value>
+    <tag> OZONE, SECURITY</tag>
+    <description>The SCM service principal. Ex scm/_HOST@REALM.TLD.</description>
+  </property>
+
+  <property>
+    <name>ozone.scm.web.authentication.kerberos.principal</name>
+    <value>HTTP/_HOST@EXAMPLE.COM</value>
+  </property>
+  <property>
+    <name>ozone.scm.web.authentication.kerberos.keytab</name>
+    <value>/etc/security/keytabs/HTTP.keytab</value>
+  </property>
 
 
 </configuration>
 </configuration>

+ 4 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -36,11 +36,15 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * The protocol spoken between datanodes and SCM. For specifics please the
  * The protocol spoken between datanodes and SCM. For specifics please the
  * Protoc file that defines this protocol.
  * Protoc file that defines this protocol.
  */
  */
+@KerberosInfo(
+    serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public interface StorageContainerDatanodeProtocol {
 public interface StorageContainerDatanodeProtocol {
   /**
   /**

+ 6 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java

@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.protocolPB;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos
     .StorageContainerDatanodeProtocolProtos
     .StorageContainerDatanodeProtocolService;
     .StorageContainerDatanodeProtocolService;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * Protocol used from a datanode to StorageContainerManager.  This extends
  * Protocol used from a datanode to StorageContainerManager.  This extends
@@ -29,6 +32,9 @@ import org.apache.hadoop.ipc.ProtocolInfo;
 @ProtocolInfo(protocolName =
 @ProtocolInfo(protocolName =
     "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol",
     "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol",
     protocolVersion = 1)
     protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
 public interface StorageContainerDatanodeProtocolPB extends
 public interface StorageContainerDatanodeProtocolPB extends
     StorageContainerDatanodeProtocolService.BlockingInterface {
     StorageContainerDatanodeProtocolService.BlockingInterface {
 }
 }

+ 47 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -28,11 +28,13 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.cache.RemovalNotification;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
@@ -83,6 +85,9 @@ import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.StorageInfo;
 import org.apache.hadoop.ozone.common.StorageInfo;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -103,6 +108,10 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 
 /**
 /**
@@ -194,11 +203,17 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    *
    *
    * @param conf configuration
    * @param conf configuration
    */
    */
-  private StorageContainerManager(OzoneConfiguration conf) throws IOException {
+  private StorageContainerManager(OzoneConfiguration conf)
+      throws IOException, AuthenticationException {
 
 
     configuration = conf;
     configuration = conf;
     StorageContainerManager.initMetrics();
     StorageContainerManager.initMetrics();
     initContainerReportCache(conf);
     initContainerReportCache(conf);
+    // Authenticate SCM if security is enabled
+    if (conf.getBoolean(OZONE_SECURITY_ENABLED_KEY,
+        OZONE_SECURITY_ENABLED_DEFAULT)) {
+      loginAsSCMUser(conf);
+    }
 
 
     scmStorage = new SCMStorage(conf);
     scmStorage = new SCMStorage(conf);
     if (scmStorage.getState() != StorageState.INITIALIZED) {
     if (scmStorage.getState() != StorageState.INITIALIZED) {
@@ -316,6 +331,33 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
 
   }
   }
 
 
+  /**
+   * Login as the configured user for SCM.
+   *
+   * @param conf
+   */
+  private void loginAsSCMUser(Configuration conf)
+      throws IOException, AuthenticationException {
+    LOG.debug("Ozone security is enabled. Attempting login for SCM user. "
+            + "Principal: {}, keytab: {}", conf.get
+            (OZONE_SCM_KERBEROS_PRINCIPAL_KEY),
+        conf.get(OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY));
+
+    if (SecurityUtil.getAuthenticationMethod(conf).equals
+        (AuthenticationMethod.KERBEROS)) {
+      UserGroupInformation.setConfiguration(conf);
+      InetSocketAddress socAddr = HddsServerUtil
+          .getScmBlockClientBindAddress(conf);
+      SecurityUtil.login(conf, OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY,
+          OZONE_SCM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
+    } else {
+      throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
+          conf) + " authentication method not support. "
+          + "SCM user login failed.");
+    }
+    LOG.info("SCM login successful.");
+  }
+
   /**
   /**
    * Builds a message for logging startup information about an RPC server.
    * Builds a message for logging startup information about an RPC server.
    *
    *
@@ -410,8 +452,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * @throws IOException
    * @throws IOException
    */
    */
   @VisibleForTesting
   @VisibleForTesting
-  public static StorageContainerManager createSCM(
-      String[] args, OzoneConfiguration conf) throws IOException {
+  public static StorageContainerManager createSCM(String[] args,
+      OzoneConfiguration conf) throws IOException, AuthenticationException {
     return createSCM(args, conf, false);
     return createSCM(args, conf, false);
   }
   }
 
 
@@ -427,7 +469,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private static StorageContainerManager createSCM(
   private static StorageContainerManager createSCM(
       String[] args,
       String[] args,
       OzoneConfiguration conf,
       OzoneConfiguration conf,
-      boolean printBanner) throws IOException {
+      boolean printBanner)
+      throws IOException, AuthenticationException {
     String[] argv = (args == null) ? new String[0] : args;
     String[] argv = (args == null) ? new String[0] : args;
     if (!HddsUtils.isHddsEnabled(conf)) {
     if (!HddsUtils.isHddsEnabled(conf)) {
       System.err.println(
       System.err.println(

+ 2 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.server.BaseHttpServer;
 import org.apache.hadoop.hdds.server.BaseHttpServer;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
@@ -63,11 +62,11 @@ public class StorageContainerManagerHttpServer extends BaseHttpServer {
   }
   }
 
 
   @Override protected String getKeytabFile() {
   @Override protected String getKeytabFile() {
-    return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE;
+    return ScmConfigKeys.SCM_WEB_AUTHENTICATION_KERBEROS_KEYTAB_FILE_KEY;
   }
   }
 
 
   @Override protected String getSpnegoPrincipal() {
   @Override protected String getSpnegoPrincipal() {
-    return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
+    return ScmConfigKeys.SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
   }
   }
 
 
   @Override protected String getEnabledKey() {
   @Override protected String getEnabledKey() {

+ 2 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -113,7 +114,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
   }
   }
 
 
   private static StorageContainerManager getScm(OzoneConfiguration conf)
   private static StorageContainerManager getScm(OzoneConfiguration conf)
-      throws IOException {
+      throws IOException, AuthenticationException {
     conf.setBoolean(OZONE_ENABLED, true);
     conf.setBoolean(OZONE_ENABLED, true);
     SCMStorage scmStore = new SCMStorage(conf);
     SCMStorage scmStore = new SCMStorage(conf);
     if(scmStore.getState() != StorageState.INITIALIZED) {
     if(scmStore.getState() != StorageState.INITIALIZED) {

+ 3 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.client.protocol;
 package org.apache.hadoop.ozone.client.protocol;
 
 
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.OzoneQuota;
@@ -32,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import org.apache.hadoop.security.KerberosInfo;
 
 
 /**
 /**
  * An implementer of this interface is capable of connecting to Ozone Cluster
  * An implementer of this interface is capable of connecting to Ozone Cluster
@@ -41,6 +43,7 @@ import java.util.Map;
  * includes: {@link org.apache.hadoop.ozone.client.rpc.RpcClient} for RPC and
  * includes: {@link org.apache.hadoop.ozone.client.rpc.RpcClient} for RPC and
  * {@link  org.apache.hadoop.ozone.client.rest.RestClient} for REST.
  * {@link  org.apache.hadoop.ozone.client.rest.RestClient} for REST.
  */
  */
+@KerberosInfo(serverPrincipal = ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY)
 public interface ClientProtocol {
 public interface ClientProtocol {
 
 
   /**
   /**

+ 12 - 4
hadoop-ozone/common/src/main/bin/start-ozone.sh

@@ -70,10 +70,18 @@ nameStartOpt="$nameStartOpt $*"
 SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
 SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
 SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
 SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
 
 
-if [[ ${SECURITY_ENABLED} == "kerberos" || ${SECURITY_AUTHORIZATION_ENABLED} == "true" ]]; then
-  echo "Ozone is not supported in a security enabled cluster."
-  exit 1
-fi
+#if [[ ${SECURITY_ENABLED} == "kerberos" || ${SECURITY_AUTHORIZATION_ENABLED}
+# == "true" ]]; then
+#  echo "Ozone is not supported in a security enabled cluster."
+#  exit 1
+#fi
+
+#SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
+#SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
+#if [[ ${SECURITY_ENABLED} == "kerberos" || ${SECURITY_AUTHORIZATION_ENABLED} == "true" ]]; then
+#  echo "Ozone is not supported in a security enabled cluster."
+#  exit 1
+#fi
 
 
 #---------------------------------------------------------
 #---------------------------------------------------------
 # Check if ozone is enabled
 # Check if ozone is enabled

+ 6 - 7
hadoop-ozone/common/src/main/bin/stop-ozone.sh

@@ -47,13 +47,12 @@ else
   exit 1
   exit 1
 fi
 fi
 
 
-SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
-SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
-
-if [[ ${SECURITY_ENABLED} == "kerberos" || ${SECURITY_AUTHORIZATION_ENABLED} == "true" ]]; then
-  echo "Ozone is not supported in a security enabled cluster."
-  exit 1
-fi
+#SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
+#SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
+#if [[ ${SECURITY_ENABLED} == "kerberos" || ${SECURITY_AUTHORIZATION_ENABLED} == "true" ]]; then
+#  echo "Ozone is not supported in a security enabled cluster."
+#  exit 1
+#fi
 
 
 #---------------------------------------------------------
 #---------------------------------------------------------
 # Check if ozone is enabled
 # Check if ozone is enabled

+ 6 - 0
hadoop-ozone/integration-test/pom.xml

@@ -38,6 +38,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>org.apache.hadoop</groupId>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-ozone-manager</artifactId>
       <artifactId>hadoop-ozone-ozone-manager</artifactId>
     </dependency>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-objectstore-service</artifactId>
       <artifactId>hadoop-ozone-objectstore-service</artifactId>

+ 3 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
@@ -107,8 +108,8 @@ public class TestContainerStateManagerIntegration {
   }
   }
 
 
   @Test
   @Test
-  public void testContainerStateManagerRestart()
-      throws IOException, TimeoutException, InterruptedException {
+  public void testContainerStateManagerRestart() throws IOException,
+      TimeoutException, InterruptedException, AuthenticationException {
     // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
     // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
 
 
     for (int i = 0; i < 10; i++) {
     for (int i = 0; i < 10; i++) {

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.hdds.scm.protocolPB
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
     .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -149,8 +150,7 @@ public interface MiniOzoneCluster {
    * @throws TimeoutException
    * @throws TimeoutException
    * @throws InterruptedException
    * @throws InterruptedException
    */
    */
-  void restartStorageContainerManager() throws InterruptedException,
-      TimeoutException, IOException;
+  void restartStorageContainerManager() throws InterruptedException, TimeoutException, IOException, AuthenticationException;
 
 
   /**
   /**
    * Restarts OzoneManager instance.
    * Restarts OzoneManager instance.

+ 15 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdds.scm.protocolPB
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -232,8 +233,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
   }
   }
 
 
   @Override
   @Override
-  public void restartStorageContainerManager()
-      throws TimeoutException, InterruptedException, IOException {
+  public void restartStorageContainerManager() throws TimeoutException,
+      InterruptedException, IOException, AuthenticationException {
     scm.stop();
     scm.stop();
     scm.join();
     scm.join();
     scm = StorageContainerManager.createSCM(null, conf);
     scm = StorageContainerManager.createSCM(null, conf);
@@ -370,9 +371,16 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     public MiniOzoneCluster build() throws IOException {
     public MiniOzoneCluster build() throws IOException {
       DefaultMetricsSystem.setMiniClusterMode(true);
       DefaultMetricsSystem.setMiniClusterMode(true);
       initializeConfiguration();
       initializeConfiguration();
-      StorageContainerManager scm = createSCM();
-      scm.start();
-      OzoneManager om = createOM();
+      StorageContainerManager scm;
+      OzoneManager om;
+      try {
+        scm = createSCM();
+        scm.start();
+        om = createOM();
+      } catch (AuthenticationException ex) {
+        throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+      }
+
       om.start();
       om.start();
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
       MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
       MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
@@ -424,7 +432,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
      *
      *
      * @throws IOException
      * @throws IOException
      */
      */
-    private StorageContainerManager createSCM() throws IOException {
+    private StorageContainerManager createSCM()
+        throws IOException, AuthenticationException {
       configureSCM();
       configureSCM();
       SCMStorage scmStore = new SCMStorage(conf);
       SCMStorage scmStore = new SCMStorage(conf);
       initializeScmStorage(scmStore);
       initializeScmStorage(scmStore);

+ 205 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java

@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.KerberosAuthException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class to for security enabled Ozone cluster.
+ */
+@InterfaceAudience.Private
+public final class TestSecureOzoneCluster {
+
+  private Logger LOGGER = LoggerFactory
+      .getLogger(TestSecureOzoneCluster.class);
+
+  private MiniKdc miniKdc;
+  private OzoneConfiguration conf;
+  private File workDir;
+  private static Properties securityProperties;
+  private File scmKeytab;
+  private File spnegoKeytab;
+  private String curUser;
+
+  @Before
+  public void init() {
+    try {
+      conf = new OzoneConfiguration();
+      startMiniKdc();
+      setSecureConfig(conf);
+      createCredentialsInKDC(conf, miniKdc);
+    } catch (IOException e) {
+      LOGGER.error("Failed to initialize TestSecureOzoneCluster", e);
+    } catch (Exception e) {
+      LOGGER.error("Failed to initialize TestSecureOzoneCluster", e);
+    }
+  }
+
+  private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
+      throws Exception {
+    createPrincipal(scmKeytab,
+        conf.get(ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY));
+    createPrincipal(spnegoKeytab,
+        conf.get(ScmConfigKeys.SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY));
+  }
+
+  private void createPrincipal(File keytab, String... principal)
+      throws Exception {
+    miniKdc.createPrincipal(keytab, principal);
+  }
+
+  private void startMiniKdc() throws Exception {
+    workDir = GenericTestUtils
+        .getTestDir(TestSecureOzoneCluster.class.getSimpleName());
+    securityProperties = MiniKdc.createConf();
+    miniKdc = new MiniKdc(securityProperties, workDir);
+    miniKdc.start();
+  }
+
+  private void setSecureConfig(Configuration conf) throws IOException {
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    String host = KerberosUtil.getLocalHostName();
+    String realm = miniKdc.getRealm();
+    curUser = UserGroupInformation.getCurrentUser()
+        .getUserName();
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    conf.set(OZONE_ADMINISTRATORS, curUser);
+
+    conf.set(ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY,
+        "scm/" + host + "@" + realm);
+    conf.set(ScmConfigKeys.SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
+        "HTTP_SCM/" + host + "@" + realm);
+
+    scmKeytab = new File(workDir, "scm.keytab");
+    spnegoKeytab = new File(workDir, "http.keytab");
+
+    conf.set(ScmConfigKeys.OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY,
+        scmKeytab.getAbsolutePath());
+    conf.set(ScmConfigKeys.SCM_WEB_AUTHENTICATION_KERBEROS_KEYTAB_FILE_KEY,
+        spnegoKeytab.getAbsolutePath());
+
+  }
+
+  @Test
+  public void testSecureScmStartupSuccess() throws Exception {
+    final String path = GenericTestUtils
+        .getTempPath(UUID.randomUUID().toString());
+    Path scmPath = Paths.get(path, "scm-meta");
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+    conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    SCMStorage scmStore = new SCMStorage(conf);
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+    StorageContainerManager scm = StorageContainerManager.createSCM(null, conf);
+    //Reads the SCM Info from SCM instance
+    ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
+    Assert.assertEquals(clusterId, scmInfo.getClusterId());
+    Assert.assertEquals(scmId, scmInfo.getScmId());
+  }
+
+  @Test
+  public void testSecureScmStartupFailure() throws Exception {
+    final String path = GenericTestUtils
+        .getTempPath(UUID.randomUUID().toString());
+    Path scmPath = Paths.get(path, "scm-meta");
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+    conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    conf.set(ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY,
+        "scm@" + miniKdc.getRealm());
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+
+    SCMStorage scmStore = new SCMStorage(conf);
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+    LambdaTestUtils.intercept(IOException.class,
+        "Running in secure mode, but config doesn't have a keytab",
+        () -> {
+          StorageContainerManager.createSCM(null, conf);
+        });
+
+    conf.set(ScmConfigKeys.OZONE_SCM_KERBEROS_PRINCIPAL_KEY,
+        "scm/_HOST@EXAMPLE.com");
+    conf.set(ScmConfigKeys.OZONE_SCM_KERBEROS_KEYTAB_FILE_KEY,
+        "/etc/security/keytabs/scm.keytab");
+
+    LambdaTestUtils.intercept(KerberosAuthException.class, "failure "
+            + "to login: for principal:",
+        () -> {
+          StorageContainerManager.createSCM(null, conf);
+        });
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "OAuth2");
+
+    LambdaTestUtils.intercept(IllegalArgumentException.class, "Invalid"
+            + " attribute value for hadoop.security.authentication of OAuth2",
+        () -> {
+          StorageContainerManager.createSCM(null, conf);
+        });
+
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "KERBEROS_SSL");
+    LambdaTestUtils.intercept(AuthenticationException.class,
+        "KERBEROS_SSL authentication method not support.",
+        () -> {
+          StorageContainerManager.createSCM(null, conf);
+        });
+
+  }
+
+}

+ 6 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

@@ -59,8 +59,10 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ExitUtil;
+
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.Test;
@@ -426,7 +428,8 @@ public class TestStorageContainerManager {
   }
   }
 
 
   @Test
   @Test
-  public void testSCMInitializationFailure() throws IOException {
+  public void testSCMInitializationFailure()
+      throws IOException, AuthenticationException {
     OzoneConfiguration conf = new OzoneConfiguration();
     OzoneConfiguration conf = new OzoneConfiguration();
     final String path =
     final String path =
         GenericTestUtils.getTempPath(UUID.randomUUID().toString());
         GenericTestUtils.getTempPath(UUID.randomUUID().toString());
@@ -439,7 +442,8 @@ public class TestStorageContainerManager {
   }
   }
 
 
   @Test
   @Test
-  public void testSCMInitializationReturnCode() throws IOException {
+  public void testSCMInitializationReturnCode() throws IOException,
+      AuthenticationException {
     ExitUtil.disableSystemExit();
     ExitUtil.disableSystemExit();
     OzoneConfiguration conf = new OzoneConfiguration();
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);