Browse Source

HDDS-134. SCM CA: OM sends CSR and uses certificate issued by SCM. Contributed by Ajay Kumar.

Ajay Kumar 6 years ago
parent
commit
4a3cddea70
20 changed files with 739 additions and 100 deletions
  1. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
  2. 18 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
  3. 35 24
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
  4. 18 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
  5. 26 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
  6. 19 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
  7. 1 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
  8. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java
  9. 2 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  10. 58 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
  11. 1 1
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  12. 16 17
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java
  13. 11 12
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
  14. 1 1
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java
  15. 1 1
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
  16. 7 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  17. 110 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
  18. 23 17
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
  19. 218 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
  20. 162 14
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java

@@ -109,7 +109,7 @@ public interface CertificateClient {
    *
    * @return CertificateSignRequest.Builder
    */
-  CertificateSignRequest.Builder getCSRBuilder();
+  CertificateSignRequest.Builder getCSRBuilder() throws CertificateException;
 
   /**
    * Get the certificate of well-known entity from SCM.

+ 18 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java

@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.client;
 
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
+import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,8 +32,22 @@ public class DNCertificateClient extends DefaultCertificateClient {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DNCertificateClient.class);
-  DNCertificateClient(SecurityConfig securityConfig, String component) {
-    super(securityConfig, component, LOG);
+  public DNCertificateClient(SecurityConfig securityConfig) {
+    super(securityConfig, LOG);
+  }
+
+  /**
+   * Returns a CSR builder that can be used to creates a Certificate signing
+   * request.
+   *
+   * @return CertificateSignRequest.Builder
+   */
+  @Override
+  public CertificateSignRequest.Builder getCSRBuilder()
+      throws CertificateException {
+    return super.getCSRBuilder()
+        .setDigitalEncryption(false)
+        .setDigitalSignature(false);
   }
 
   public Logger getLogger() {

+ 35 - 24
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.security.x509.certificate.client;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.validator.routines.DomainValidator;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
@@ -66,20 +67,16 @@ public abstract class DefaultCertificateClient implements CertificateClient {
 
   private final Logger logger;
   private final SecurityConfig securityConfig;
-  private final String component;
   private final KeyCodec keyCodec;
   private PrivateKey privateKey;
   private PublicKey publicKey;
   private X509Certificate x509Certificate;
 
 
-  DefaultCertificateClient(SecurityConfig securityConfig, String component,
-      Logger log) {
+  DefaultCertificateClient(SecurityConfig securityConfig, Logger log) {
     Objects.requireNonNull(securityConfig);
-    Objects.requireNonNull(component);
-    this.component = component;
     this.securityConfig = securityConfig;
-    keyCodec = new KeyCodec(securityConfig, component);
+    keyCodec = new KeyCodec(securityConfig);
     this.logger = log;
   }
 
@@ -95,15 +92,14 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       return privateKey;
     }
 
-    Path keyPath = securityConfig.getKeyLocation(component);
+    Path keyPath = securityConfig.getKeyLocation();
     if (OzoneSecurityUtil.checkIfFileExist(keyPath,
         securityConfig.getPrivateKeyFileName())) {
       try {
         privateKey = keyCodec.readPrivateKey();
       } catch (InvalidKeySpecException | NoSuchAlgorithmException
           | IOException e) {
-        getLogger().error("Error while getting private key for {}",
-            component, e);
+        getLogger().error("Error while getting private key.", e);
       }
     }
     return privateKey;
@@ -121,15 +117,14 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       return publicKey;
     }
 
-    Path keyPath = securityConfig.getKeyLocation(component);
+    Path keyPath = securityConfig.getKeyLocation();
     if (OzoneSecurityUtil.checkIfFileExist(keyPath,
         securityConfig.getPublicKeyFileName())) {
       try {
         publicKey = keyCodec.readPublicKey();
       } catch (InvalidKeySpecException | NoSuchAlgorithmException
           | IOException e) {
-        getLogger().error("Error while getting private key for {}",
-            component, e);
+        getLogger().error("Error while getting public key.", e);
       }
     }
     return publicKey;
@@ -147,18 +142,18 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       return x509Certificate;
     }
 
-    Path certPath = securityConfig.getCertificateLocation(component);
+    Path certPath = securityConfig.getCertificateLocation();
     if (OzoneSecurityUtil.checkIfFileExist(certPath,
         securityConfig.getCertificateFileName())) {
       CertificateCodec certificateCodec =
-          new CertificateCodec(securityConfig, component);
+          new CertificateCodec(securityConfig);
       try {
         X509CertificateHolder x509CertificateHolder =
             certificateCodec.readCertificate();
         x509Certificate =
             CertificateCodec.getX509Certificate(x509CertificateHolder);
       } catch (java.security.cert.CertificateException | IOException e) {
-        getLogger().error("Error reading certificate for {}", component, e);
+        getLogger().error("Error reading certificate.", e);
       }
     }
     return x509Certificate;
@@ -318,8 +313,26 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    * @return CertificateSignRequest.Builder
    */
   @Override
-  public CertificateSignRequest.Builder getCSRBuilder() {
-    return new CertificateSignRequest.Builder();
+  public CertificateSignRequest.Builder getCSRBuilder()
+      throws CertificateException {
+    CertificateSignRequest.Builder builder =
+        new CertificateSignRequest.Builder()
+        .setConfiguration(securityConfig.getConfiguration());
+    try {
+      DomainValidator validator = DomainValidator.getInstance();
+      // Add all valid ips.
+      OzoneSecurityUtil.getValidInetsForCurrentHost().forEach(
+          ip -> {
+            builder.addIpAddress(ip.getHostAddress());
+            if(validator.isValid(ip.getCanonicalHostName())) {
+              builder.addDnsName(ip.getCanonicalHostName());
+            }
+          });
+    } catch (IOException e) {
+      throw new CertificateException("Error while adding ip to CSR builder",
+          e, CSR_ERROR);
+    }
+    return builder;
   }
 
   /**
@@ -345,8 +358,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   @Override
   public void storeCertificate(X509Certificate certificate)
       throws CertificateException {
-    CertificateCodec certificateCodec = new CertificateCodec(securityConfig,
-        component);
+    CertificateCodec certificateCodec = new CertificateCodec(securityConfig);
     try {
       certificateCodec.writeCertificate(
           new X509CertificateHolder(certificate.getEncoded()));
@@ -595,7 +607,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    * location.
    * */
   protected void bootstrapClientKeys() throws CertificateException {
-    Path keyPath = securityConfig.getKeyLocation(component);
+    Path keyPath = securityConfig.getKeyLocation();
     if (Files.notExists(keyPath)) {
       try {
         Files.createDirectories(keyPath);
@@ -618,10 +630,9 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       keyCodec.writePrivateKey(keyPair.getPrivate());
     } catch (NoSuchProviderException | NoSuchAlgorithmException
         | IOException e) {
-      getLogger().error("Error while bootstrapping certificate client for {}",
-          component, e);
-      throw new CertificateException("Error while bootstrapping certificate " +
-          "client for" + component, BOOTSTRAP_ERROR);
+      getLogger().error("Error while bootstrapping certificate client.", e);
+      throw new CertificateException("Error while bootstrapping certificate.",
+          BOOTSTRAP_ERROR);
     }
     return keyPair;
   }

+ 18 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java

@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.client;
 
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +39,8 @@ public class OMCertificateClient extends DefaultCertificateClient {
   private static final Logger LOG =
       LoggerFactory.getLogger(OMCertificateClient.class);
 
-  public OMCertificateClient(SecurityConfig securityConfig, String component) {
-    super(securityConfig, component, LOG);
+  public OMCertificateClient(SecurityConfig securityConfig) {
+    super(securityConfig, LOG);
   }
 
   protected InitResponse handleCase(InitCase init) throws
@@ -96,6 +97,21 @@ public class OMCertificateClient extends DefaultCertificateClient {
     }
   }
 
+  /**
+   * Returns a CSR builder that can be used to creates a Certificate signing
+   * request.
+   *
+   * @return CertificateSignRequest.Builder
+   */
+  @Override
+  public CertificateSignRequest.Builder getCSRBuilder()
+      throws CertificateException {
+    return super.getCSRBuilder()
+        .setDigitalEncryption(true)
+        .setDigitalSignature(true);
+  }
+
+
   public Logger getLogger() {
     return LOG;
   }

+ 26 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java

@@ -80,6 +80,16 @@ public class CertificateCodec {
     this.location = securityConfig.getCertificateLocation(component);
   }
 
+  /**
+   * Creates an CertificateCodec.
+   *
+   * @param config - Security Config.
+   */
+  public CertificateCodec(SecurityConfig config) {
+    this.securityConfig = config;
+    this.location = securityConfig.getCertificateLocation();
+  }
+
   /**
    * Creates an CertificateCodec.
    *
@@ -167,6 +177,22 @@ public class CertificateCodec {
     return location;
   }
 
+  /**
+   * Gets the X.509 Certificate from PEM encoded String.
+   *
+   * @param pemEncodedString - PEM encoded String.
+   * @return X509Certificate  - Certificate.
+   * @throws CertificateException - Thrown on Failure.
+   * @throws IOException          - Thrown on Failure.
+   */
+  public static X509Certificate getX509Cert(String pemEncodedString)
+      throws CertificateException, IOException {
+    CertificateFactory fact = CertificateFactory.getInstance("X.509");
+    try (InputStream input = IOUtils.toInputStream(pemEncodedString, UTF_8)) {
+      return (X509Certificate) fact.generateCertificate(input);
+    }
+  }
+
   /**
    * Write the Certificate pointed to the location by the configs.
    *

+ 19 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java

@@ -144,6 +144,8 @@ public final class CertificateSignRequest {
     private SecurityConfig config;
     private List<GeneralName> altNames;
     private Boolean ca = false;
+    private boolean digitalSignature;
+    private boolean digitalEncryption;
 
     public CertificateSignRequest.Builder setConfiguration(
         Configuration configuration) {
@@ -171,6 +173,16 @@ public final class CertificateSignRequest {
       return this;
     }
 
+    public Builder setDigitalSignature(boolean dSign) {
+      this.digitalSignature = dSign;
+      return this;
+    }
+
+    public Builder setDigitalEncryption(boolean dEncryption) {
+      this.digitalEncryption = dEncryption;
+      return this;
+    }
+
     // Support SAN extenion with DNS and RFC822 Name
     // other name type will be added as needed.
     public CertificateSignRequest.Builder addDnsName(String dnsName) {
@@ -200,8 +212,13 @@ public final class CertificateSignRequest {
     }
 
     private Extension getKeyUsageExtension() throws IOException {
-      int keyUsageFlag = KeyUsage.digitalSignature | KeyUsage.keyEncipherment
-          | KeyUsage.dataEncipherment | KeyUsage.keyAgreement;
+      int keyUsageFlag = KeyUsage.keyAgreement;
+      if(digitalEncryption){
+        keyUsageFlag |= KeyUsage.keyEncipherment | KeyUsage.dataEncipherment;
+      }
+      if(digitalSignature) {
+        keyUsageFlag |= KeyUsage.digitalSignature;
+      }
 
       if (ca) {
         keyUsageFlag |= KeyUsage.keyCertSign | KeyUsage.cRLSign;

+ 1 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java

@@ -82,6 +82,7 @@ public class CertificateException extends SCMSecurityException {
     CRYPTO_SIGN_ERROR,
     CERTIFICATE_ERROR,
     BOOTSTRAP_ERROR,
+    CSR_ERROR,
     CRYPTO_SIGNATURE_VERIFICATION_ERROR
   }
 }

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java

@@ -87,6 +87,17 @@ public class KeyCodec {
     this.location = securityConfig.getKeyLocation(component);
   }
 
+  /**
+   * Creates an KeyCodec.
+   *
+   * @param config - Security Config.
+   */
+  public KeyCodec(SecurityConfig config) {
+    this.securityConfig = config;
+    isPosixFileSystem = KeyCodec::isPosix;
+    this.location = securityConfig.getKeyLocation();
+  }
+
   /**
    * Creates an HDDS Key Writer.
    *

+ 2 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -272,6 +272,8 @@ public final class OzoneConsts {
   public static final Metadata.Key<String> USER_METADATA_KEY =
       Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
 
+  public static final String RPC_PORT = "RPC";
+
   // Default OMServiceID for OM Ratis servers to use as RaftGroupId
   public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
 

+ 58 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java

@@ -21,13 +21,25 @@ package org.apache.hadoop.ozone;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 
+import org.apache.commons.validator.routines.InetAddressValidator;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Ozone security Util class.
@@ -36,6 +48,12 @@ import java.nio.file.Paths;
 @InterfaceStability.Evolving
 public final class OzoneSecurityUtil {
 
+  private final static Logger LOG =
+      LoggerFactory.getLogger(OzoneSecurityUtil.class);
+  // List of ip's not recommended to be added to CSR.
+  private final static Set<String> INVALID_IPS = new HashSet<>(Arrays.asList(
+      "0.0.0.0", "127.0.0.1"));
+
   private OzoneSecurityUtil() {
   }
 
@@ -57,4 +75,44 @@ public final class OzoneSecurityUtil {
     return false;
   }
 
+  /**
+   * Iterates through network interfaces and return all valid ip's not
+   * listed in CertificateSignRequest#INVALID_IPS.
+   *
+   * @return List<InetAddress>
+   * @throws IOException if no network interface are found or if an error
+   * occurs.
+   */
+  public static List<InetAddress> getValidInetsForCurrentHost()
+      throws IOException {
+    List<InetAddress> hostIps = new ArrayList<>();
+    InetAddressValidator ipValidator = InetAddressValidator.getInstance();
+
+    Enumeration<NetworkInterface> enumNI =
+        NetworkInterface.getNetworkInterfaces();
+    if (enumNI != null) {
+      while (enumNI.hasMoreElements()) {
+        NetworkInterface ifc = enumNI.nextElement();
+        if (ifc.isUp()) {
+          Enumeration<InetAddress> enumAdds = ifc.getInetAddresses();
+          while (enumAdds.hasMoreElements()) {
+            InetAddress addr = enumAdds.nextElement();
+
+            if (ipValidator.isValid(addr.getHostAddress())
+                && !INVALID_IPS.contains(addr.getHostAddress())) {
+              LOG.info("Adding ip:{},host:{}", addr.getHostAddress(),
+                  addr.getHostName());
+              hostIps.add(addr);
+            } else {
+              LOG.info("ip:{},host:{} not returned.", addr.getHostAddress(),
+                  addr.getHostName());
+            }
+          }
+        }
+      }
+      return hostIps;
+    } else {
+      throw new IOException("Unable to get network interfaces.");
+    }
+  }
 }

+ 1 - 1
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1830,7 +1830,7 @@
   </property>
   <property>
     <name>ozone.scm.security.service.address</name>
-    <value>0.0.0.0:9961</value>
+    <value/>
     <tag>OZONE, HDDS, SECURITY</tag>
     <description>Address of SCMSecurityProtocolServer.</description>
   </property>

+ 16 - 17
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java

@@ -61,7 +61,6 @@ public class TestCertificateClientInit {
 
   private CertificateClient dnCertificateClient;
   private CertificateClient omCertificateClient;
-  private static final String COMP = "test";
   private HDDSKeyGenerator keyGenerator;
   private Path metaDirPath;
   private SecurityConfig securityConfig;
@@ -97,11 +96,11 @@ public class TestCertificateClientInit {
     metaDirPath = Paths.get(path, "test");
     config.set(HDDS_METADATA_DIR_NAME, metaDirPath.toString());
     securityConfig = new SecurityConfig(config);
-    dnCertificateClient = new DNCertificateClient(securityConfig, COMP);
-    omCertificateClient = new OMCertificateClient(securityConfig, COMP);
+    dnCertificateClient = new DNCertificateClient(securityConfig);
+    omCertificateClient = new OMCertificateClient(securityConfig);
     keyGenerator = new HDDSKeyGenerator(securityConfig);
-    keyCodec = new KeyCodec(securityConfig, COMP);
-    Files.createDirectories(securityConfig.getKeyLocation(COMP));
+    keyCodec = new KeyCodec(securityConfig);
+    Files.createDirectories(securityConfig.getKeyLocation());
   }
 
   @After
@@ -118,7 +117,7 @@ public class TestCertificateClientInit {
     if (pvtKeyPresent) {
       keyCodec.writePrivateKey(keyPair.getPrivate());
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getPrivateKeyFileName()).toFile());
     }
 
@@ -127,7 +126,7 @@ public class TestCertificateClientInit {
         keyCodec.writePublicKey(keyPair.getPublic());
       }
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getPublicKeyFileName()).toFile());
     }
 
@@ -135,11 +134,11 @@ public class TestCertificateClientInit {
       X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
           "CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
 
-      CertificateCodec codec = new CertificateCodec(securityConfig, COMP);
+      CertificateCodec codec = new CertificateCodec(securityConfig);
       codec.writeCertificate(new X509CertificateHolder(
           x509Certificate.getEncoded()));
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getCertificateFileName()).toFile());
     }
     InitResponse response = dnCertificateClient.init();
@@ -148,10 +147,10 @@ public class TestCertificateClientInit {
 
     if (!response.equals(FAILURE)) {
       assertTrue(OzoneSecurityUtil.checkIfFileExist(
-          securityConfig.getKeyLocation(COMP),
+          securityConfig.getKeyLocation(),
           securityConfig.getPrivateKeyFileName()));
       assertTrue(OzoneSecurityUtil.checkIfFileExist(
-          securityConfig.getKeyLocation(COMP),
+          securityConfig.getKeyLocation(),
           securityConfig.getPublicKeyFileName()));
     }
   }
@@ -162,7 +161,7 @@ public class TestCertificateClientInit {
     if (pvtKeyPresent) {
       keyCodec.writePrivateKey(keyPair.getPrivate());
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getPrivateKeyFileName()).toFile());
     }
 
@@ -171,7 +170,7 @@ public class TestCertificateClientInit {
         keyCodec.writePublicKey(keyPair.getPublic());
       }
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getPublicKeyFileName()).toFile());
     }
 
@@ -179,11 +178,11 @@ public class TestCertificateClientInit {
       X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
           "CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
 
-      CertificateCodec codec = new CertificateCodec(securityConfig, COMP);
+      CertificateCodec codec = new CertificateCodec(securityConfig);
       codec.writeCertificate(new X509CertificateHolder(
           x509Certificate.getEncoded()));
     } else {
-      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+      FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
           .toString(), securityConfig.getCertificateFileName()).toFile());
     }
     InitResponse response = omCertificateClient.init();
@@ -196,10 +195,10 @@ public class TestCertificateClientInit {
 
     if (!response.equals(FAILURE)) {
       assertTrue(OzoneSecurityUtil.checkIfFileExist(
-          securityConfig.getKeyLocation(COMP),
+          securityConfig.getKeyLocation(),
           securityConfig.getPrivateKeyFileName()));
       assertTrue(OzoneSecurityUtil.checkIfFileExist(
-          securityConfig.getKeyLocation(COMP),
+          securityConfig.getKeyLocation(),
           securityConfig.getPublicKeyFileName()));
     }
   }

+ 11 - 12
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java

@@ -64,7 +64,6 @@ public class TestDefaultCertificateClient {
 
   private OMCertificateClient omCertClient;
   private DNCertificateClient dnCertClient;
-  private static final String COMP = "test";
   private HDDSKeyGenerator keyGenerator;
   private Path metaDirPath;
   private SecurityConfig securityConfig;
@@ -81,13 +80,13 @@ public class TestDefaultCertificateClient {
     securityConfig = new SecurityConfig(config);
     getCertClient();
     keyGenerator = new HDDSKeyGenerator(securityConfig);
-    keyCodec = new KeyCodec(securityConfig, COMP);
-    Files.createDirectories(securityConfig.getKeyLocation(COMP));
+    keyCodec = new KeyCodec(securityConfig);
+    Files.createDirectories(securityConfig.getKeyLocation());
   }
 
   private void getCertClient() {
-    omCertClient = new OMCertificateClient(securityConfig, COMP);
-    dnCertClient = new DNCertificateClient(securityConfig, COMP);
+    omCertClient = new OMCertificateClient(securityConfig);
+    dnCertClient = new DNCertificateClient(securityConfig);
   }
 
   @After
@@ -160,7 +159,7 @@ public class TestDefaultCertificateClient {
         () -> omCertClient.signDataStream(IOUtils.toInputStream(data,
             UTF)));
 
-    KeyPair keyPair = generateKeyPairFiles();
+    generateKeyPairFiles();
     byte[] sign = omCertClient.signDataStream(IOUtils.toInputStream(data,
         UTF));
     validateHash(sign, data.getBytes());
@@ -247,11 +246,11 @@ public class TestDefaultCertificateClient {
     omClientLog.clearOutput();
 
     // Case 1. Expect failure when keypair validation fails.
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPrivateKeyFileName()).toFile());
     keyCodec.writePrivateKey(keyPair.getPrivate());
 
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
     keyCodec.writePublicKey(keyPair2.getPublic());
 
@@ -272,12 +271,12 @@ public class TestDefaultCertificateClient {
     // Case 2. Expect failure when certificate is generated from different
     // private key and keypair validation fails.
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getCertificateFileName()).toFile());
     X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
         "CN=Test", keyGenerator.generateKey(), 10,
         securityConfig.getSignatureAlgo());
-    CertificateCodec codec = new CertificateCodec(securityConfig, COMP);
+    CertificateCodec codec = new CertificateCodec(securityConfig);
     codec.writeCertificate(new X509CertificateHolder(
         x509Certificate.getEncoded()));
 
@@ -299,7 +298,7 @@ public class TestDefaultCertificateClient {
 
     // Re write the correct public key.
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
     keyCodec.writePublicKey(keyPair.getPublic());
 
@@ -319,7 +318,7 @@ public class TestDefaultCertificateClient {
 
     // Case 4. Failure when public key recovery fails.
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation(COMP)
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
 
     // Check for DN.

+ 1 - 1
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java

@@ -80,7 +80,7 @@ public class TestOzoneBlockTokenSecretManager {
 
   private CertificateClient getCertificateClient(SecurityConfig secConf)
       throws Exception {
-    return new OMCertificateClient(secConf, "om"){
+    return new OMCertificateClient(secConf){
       @Override
       public X509Certificate getCertificate() {
         return x509Certificate;

+ 1 - 1
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java

@@ -80,7 +80,7 @@ public class TestOzoneDelegationTokenSecretManager {
     X509Certificate cert = KeyStoreTestUtil
         .generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA");
 
-    return new OMCertificateClient(securityConfig, "test") {
+    return new OMCertificateClient(securityConfig) {
       @Override
       public X509Certificate getCertificate() {
         return cert;

+ 7 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -392,7 +392,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
         scm = createSCM();
         scm.start();
         om = createOM();
-        om.setCertClient(certClient);
+        if(certClient != null) {
+          om.setCertClient(certClient);
+        }
       } catch (AuthenticationException ex) {
         throw new IOException("Unable to build MiniOzoneCluster. ", ex);
       }
@@ -476,6 +478,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       omStorage.setClusterId(clusterId);
       omStorage.setScmId(scmId.get());
       omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
+      // Initialize ozone certificate client if security is enabled.
+      if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+        OzoneManager.initializeSecurity(conf, omStorage);
+      }
       omStorage.initialize();
     }
 

+ 110 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
 import org.apache.hadoop.io.Text;
@@ -65,6 +64,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
+import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -80,6 +80,10 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.bouncycastle.asn1.x500.RDN;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateHolder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -90,6 +94,12 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.security.cert.X509Certificate;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+
 
 /**
  * Test class to for security enabled Ozone cluster.
@@ -97,7 +107,7 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public final class TestSecureOzoneCluster {
 
-  private static final String TEST_USER = "testUgiUser";
+  private static final String TEST_USER = "testUgiUser@EXAMPLE.COM";
   private static final int CLIENT_TIMEOUT = 2 * 1000;
   private Logger logger = LoggerFactory
       .getLogger(TestSecureOzoneCluster.class);
@@ -118,6 +128,7 @@ public final class TestSecureOzoneCluster {
   private UserGroupInformation testKerberosUgi;
   private StorageContainerManager scm;
   private OzoneManager om;
+  private String host;
 
   private static String clusterId;
   private static String scmId;
@@ -137,10 +148,15 @@ public final class TestSecureOzoneCluster {
       final String path = folder.newFolder().toString();
       metaDirPath = Paths.get(path, "om-meta");
       conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
+      conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+      conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+          KERBEROS.toString());
+
       startMiniKdc();
       setSecureConfig(conf);
       createCredentialsInKDC(conf, miniKdc);
       generateKeyPair(conf);
+//      OzoneManager.setTestSecureOmFlag(true);
     } catch (IOException e) {
       logger.error("Failed to initialize TestSecureOzoneCluster", e);
     } catch (Exception e) {
@@ -198,7 +214,7 @@ public final class TestSecureOzoneCluster {
   private void setSecureConfig(Configuration configuration) throws IOException {
     configuration.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     configuration.setBoolean(OZONE_ENABLED, true);
-    String host = InetAddress.getLocalHost().getCanonicalHostName()
+    host = InetAddress.getLocalHost().getCanonicalHostName()
         .toLowerCase();
     String realm = miniKdc.getRealm();
     curUser = UserGroupInformation.getCurrentUser()
@@ -417,6 +433,7 @@ public final class TestSecureOzoneCluster {
         RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
     try {
       // Start OM
+      om.setCertClient(new CertificateClientTestImpl(conf));
       om.start();
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
       String username = ugi.getUserName();
@@ -564,6 +581,7 @@ public final class TestSecureOzoneCluster {
     // Start OM
 
     try {
+      om.setCertClient(new CertificateClientTestImpl(conf));
       om.start();
 
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -643,7 +661,94 @@ public final class TestSecureOzoneCluster {
     omStore.initialize();
     OzoneManager.setTestSecureOmFlag(true);
     om = OzoneManager.createOm(null, config);
-    CertificateClient certClient = new CertificateClientTestImpl(config);
-    om.setCertClient(certClient);
+  }
+
+  /**
+   * Test functionality to get SCM signed certificate for OM.
+   */
+  @Test
+  public void testSecureOmInitSuccess() throws Exception {
+    LogCapturer omLogs =
+        LogCapturer.captureLogs(OzoneManager.getLogger());
+    omLogs.clearOutput();
+    initSCM();
+    try {
+      scm = StorageContainerManager.createSCM(null, conf);
+      scm.start();
+
+      OMStorage omStore = new OMStorage(conf);
+      initializeOmStorage(omStore);
+      OzoneManager.setTestSecureOmFlag(true);
+      om = OzoneManager.createOm(null, conf);
+
+      Assert.assertNotNull(om.getCertificateClient());
+      Assert.assertNotNull(om.getCertificateClient().getPublicKey());
+      Assert.assertNotNull(om.getCertificateClient().getPrivateKey());
+      Assert.assertNotNull(om.getCertificateClient().getCertificate());
+      Assert.assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
+      Assert.assertTrue(omLogs.getOutput().contains("Successfully stored " +
+          "SCM signed certificate"));
+      X509Certificate certificate = om.getCertificateClient().getCertificate();
+      validateCertificate(certificate);
+    } finally {
+      if (scm != null) {
+        scm.stop();
+      }
+      if (om != null) {
+        om.stop();
+      }
+
+    }
+
+  }
+
+  public void validateCertificate(X509Certificate cert) throws Exception {
+
+    // Assert that we indeed have a self signed certificate.
+    X500Name x500Issuer = new JcaX509CertificateHolder(cert).getIssuer();
+    RDN cn = x500Issuer.getRDNs(BCStyle.CN)[0];
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String scmUser = "scm@" + hostName;
+    Assert.assertEquals(scmUser, cn.getFirst().getValue().toString());
+
+    // Subject name should be om login user in real world but in this test
+    // UGI has scm user context.
+    Assert.assertEquals(scmUser, cn.getFirst().getValue().toString());
+
+    LocalDate today = LocalDateTime.now().toLocalDate();
+    Date invalidDate;
+
+    // Make sure the end date is honored.
+    invalidDate = java.sql.Date.valueOf(today.plus(1, ChronoUnit.DAYS));
+    Assert.assertTrue(cert.getNotAfter().after(invalidDate));
+
+    invalidDate = java.sql.Date.valueOf(today.plus(400, ChronoUnit.DAYS));
+    Assert.assertTrue(cert.getNotAfter().before(invalidDate));
+
+    Assert.assertTrue(cert.getSubjectDN().toString().contains(scmId));
+    Assert.assertTrue(cert.getSubjectDN().toString().contains(clusterId));
+
+    Assert.assertTrue(cert.getIssuerDN().toString().contains(scmUser));
+    Assert.assertTrue(cert.getIssuerDN().toString().contains(scmId));
+    Assert.assertTrue(cert.getIssuerDN().toString().contains(clusterId));
+
+    // Verify that certificate matches the public key.
+    String encodedKey1 = cert.getPublicKey().toString();
+    String encodedKey2 = om.getCertificateClient().getPublicKey().toString();
+    Assert.assertEquals(encodedKey1, encodedKey2);
+  }
+
+  private void initializeOmStorage(OMStorage omStorage) throws IOException {
+    if (omStorage.getState() == Storage.StorageState.INITIALIZED) {
+      return;
+    }
+    omStorage.setClusterId(clusterId);
+    omStorage.setScmId(scmId);
+    omStorage.setOmId(omId);
+    // Initialize ozone certificate client if security is enabled.
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      OzoneManager.initializeSecurity(conf, omStorage);
+    }
+    omStorage.initialize();
   }
 }

+ 23 - 17
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java

@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.security.KeyPair;
 import java.security.PrivateKey;
@@ -45,27 +47,14 @@ public class CertificateClientTestImpl implements CertificateClient {
 
   private final SecurityConfig securityConfig;
   private final KeyPair keyPair;
-  private final X509Certificate cert;
+  private final Configuration config;
 
   public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception{
     securityConfig = new SecurityConfig(conf);
     HDDSKeyGenerator keyGen =
         new HDDSKeyGenerator(securityConfig.getConfiguration());
     keyPair = keyGen.generateKey();
-
-    SelfSignedCertificate.Builder builder =
-        SelfSignedCertificate.newBuilder()
-            .setBeginDate(LocalDate.now())
-            .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
-            .setClusterID("cluster1")
-            .setKey(keyPair)
-            .setSubject("TestCertSub")
-            .setConfiguration(conf)
-            .setScmID("TestScmId1")
-            .makeCA();
-
-    X509CertificateHolder certificateHolder = builder.build();
-    cert = new JcaX509CertificateConverter().getCertificate(certificateHolder);
+    config = conf;
   }
 
   @Override
@@ -80,7 +69,24 @@ public class CertificateClientTestImpl implements CertificateClient {
 
   @Override
   public X509Certificate getCertificate() {
-    return cert;
+    SelfSignedCertificate.Builder builder =
+        SelfSignedCertificate.newBuilder()
+            .setBeginDate(LocalDate.now())
+            .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
+            .setClusterID("cluster1")
+            .setKey(keyPair)
+            .setSubject("TestCertSub")
+            .setConfiguration(config)
+            .setScmID("TestScmId1")
+            .makeCA();
+    X509CertificateHolder certificateHolder = null;
+    try {
+      certificateHolder = builder.build();
+      return new JcaX509CertificateConverter().getCertificate(
+          certificateHolder);
+    } catch (IOException | java.security.cert.CertificateException e) {
+    }
+    return null;
   }
 
   @Override
@@ -113,7 +119,7 @@ public class CertificateClientTestImpl implements CertificateClient {
 
   @Override
   public CertificateSignRequest.Builder getCSRBuilder() {
-    return null;
+    return new CertificateSignRequest.Builder();
   }
 
   @Override

+ 218 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.net.ConnectException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.UUID;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.apache.hadoop.test.GenericTestUtils.*;
+
+/**
+ * Test secure Ozone Manager operation in distributed handler scenario.
+ */
+public class TestSecureOzoneManager {
+
+  private MiniOzoneCluster cluster = null;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private String omId;
+  private Path metaDir;
+
+  @Rule
+  public Timeout timeout = new Timeout(1000 * 25);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omId = UUID.randomUUID().toString();
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+    conf.set(OZONE_SCM_NAMES, "localhost");
+    final String path = getTempPath(UUID.randomUUID().toString());
+    metaDir = Paths.get(path, "om-meta");
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+    OzoneManager.setTestSecureOmFlag(true);
+
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    FileUtils.deleteQuietly(metaDir.toFile());
+  }
+
+  /**
+   * Test failure cases for secure OM initialization.
+   */
+  @Test
+  public void testSecureOmInitFailures() throws Exception {
+    PrivateKey privateKey;
+    PublicKey publicKey;
+    LogCapturer omLogs =
+        LogCapturer.captureLogs(OzoneManager.getLogger());
+    OMStorage omStorage = new OMStorage(conf);
+    omStorage.setClusterId(clusterId);
+    omStorage.setScmId(scmId);
+    omStorage.setOmId(omId);
+    omLogs.clearOutput();
+
+    // Case 1: When keypair as well as certificate is missing. Initial keypair
+    // boot-up. Get certificate will fail no SCM is not running.
+    LambdaTestUtils.intercept(ConnectException.class, "Connection " +
+            "refused; For more detail",
+        () -> OzoneManager.initializeSecurity(conf, omStorage));
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    CertificateClient client =
+        new OMCertificateClient(securityConfig);
+    privateKey = client.getPrivateKey();
+    publicKey = client.getPublicKey();
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
+    omLogs.clearOutput();
+
+    // Case 2: If key pair already exist than response should be RECOVER.
+    client = new OMCertificateClient(securityConfig);
+    LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
+            " initialization failed",
+        () -> OzoneManager.initializeSecurity(conf, omStorage));
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: RECOVER"));
+    Assert.assertTrue(omLogs.getOutput().contains(" OM certificate is " +
+        "missing"));
+    omLogs.clearOutput();
+
+    // Case 3: When public key as well as certificate is missing.
+    client = new OMCertificateClient(securityConfig);
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPublicKeyFileName()).toFile());
+    LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
+            " initialization failed",
+        () -> OzoneManager.initializeSecurity(conf, omStorage));
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: FAILURE"));
+    omLogs.clearOutput();
+
+    // Case 4: When private key and certificate is missing.
+    client = new OMCertificateClient(securityConfig);
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPrivateKeyFileName()).toFile());
+    KeyCodec keyCodec = new KeyCodec(securityConfig);
+    keyCodec.writePublicKey(publicKey);
+    LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
+            " initialization failed",
+        () -> OzoneManager.initializeSecurity(conf, omStorage));
+    Assert.assertNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: FAILURE"));
+    omLogs.clearOutput();
+
+    // Case 5: When only certificate is present.
+    client = new OMCertificateClient(securityConfig);
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPublicKeyFileName()).toFile());
+    CertificateCodec certCodec = new CertificateCodec(securityConfig);
+    X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
+        "CN=Test", new KeyPair(publicKey, privateKey), 10,
+        securityConfig.getSignatureAlgo());
+    certCodec.writeCertificate(new X509CertificateHolder(
+        x509Certificate.getEncoded()));
+    LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
+            " initialization failed",
+        () -> OzoneManager.initializeSecurity(conf, omStorage));
+    Assert.assertNull(client.getPrivateKey());
+    Assert.assertNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: FAILURE"));
+    omLogs.clearOutput();
+
+    // Case 6: When private key and certificate is present.
+    client = new OMCertificateClient(securityConfig);
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPublicKeyFileName()).toFile());
+    keyCodec.writePrivateKey(privateKey);
+    OzoneManager.initializeSecurity(conf, omStorage);
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: SUCCESS"));
+    omLogs.clearOutput();
+
+    // Case 7 When keypair and certificate is present.
+    client = new OMCertificateClient(securityConfig);
+    OzoneManager.initializeSecurity(conf, omStorage);
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(omLogs.getOutput().contains("Init response: SUCCESS"));
+    omLogs.clearOutput();
+  }
+
+}

+ 162 - 14
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -24,9 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.security.KeyPair;
 import java.util.Collection;
 import java.util.Objects;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
@@ -38,6 +43,9 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -48,6 +56,9 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolCli
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -126,6 +137,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.utils.RetriableTask;
 import org.apache.ratis.util.LifeCycle;
+import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,8 +162,9 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForSecurityProtocol;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
@@ -159,8 +172,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 
+import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_HANDLER_COUNT_DEFAULT;
@@ -246,6 +259,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final SecurityConfig secConfig;
   private final S3SecretManager s3SecretManager;
   private volatile boolean isOmRpcServerRunning = false;
+  private String omComponent;
 
   private KeyProviderCryptoExtension kmsProvider = null;
   private static String keyProviderUriKeyName =
@@ -291,12 +305,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
 
     secConfig = new SecurityConfig(configuration);
+    if (secConfig.isSecurityEnabled()) {
+      omComponent = OM_DAEMON + "-" + omId;
+      certClient = new OMCertificateClient(new SecurityConfig(conf));
+      delegationTokenMgr = createDelegationTokenSecretManager(configuration);
+    }
     if (secConfig.isBlockTokenEnabled()) {
       blockTokenMgr = createBlockTokenSecretManager(configuration);
     }
-    if(secConfig.isSecurityEnabled()){
-      delegationTokenMgr = createDelegationTokenSecretManager(configuration);
-    }
 
     omRpcServer = getRpcServer(conf);
     omRpcAddress = updateRPCListenAddress(configuration,
@@ -668,11 +684,17 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   private void readKeyPair() throws OzoneSecurityException {
     try {
-      keyPair = new KeyPair(certClient.getPublicKey(),
-          certClient.getPrivateKey());
+      LOG.info("Reading keypair and certificate from file system.");
+      PublicKey pubKey = certClient.getPublicKey();
+      PrivateKey pvtKey = certClient.getPrivateKey();
+      Objects.requireNonNull(pubKey);
+      Objects.requireNonNull(pvtKey);
+      Objects.requireNonNull(certClient.getCertificate());
+
+      keyPair = new KeyPair(pubKey, pvtKey);
     } catch (Exception e) {
-      throw new OzoneSecurityException("Error reading private file for "
-          + "OzoneManager", e, OzoneSecurityException
+      throw new OzoneSecurityException("Error reading keypair & certificate "
+          + "OzoneManager.", e, OzoneSecurityException
           .ResultCodes.OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
     }
   }
@@ -730,6 +752,29 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class);
   }
 
+  /**
+   * Create a scm security client, used to get SCM signed certificate.
+   *
+   * @return {@link SCMSecurityProtocol}
+   * @throws IOException
+   */
+  private static SCMSecurityProtocol getScmSecurityClient(
+      OzoneConfiguration conf) throws IOException {
+    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+    InetSocketAddress scmSecurityProtoAdd =
+        getScmAddressForSecurityProtocol(conf);
+    SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
+        new SCMSecurityProtocolClientSideTranslatorPB(
+            RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
+                scmSecurityProtoAdd, UserGroupInformation.getCurrentUser(),
+                conf, NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmSecurityClient;
+  }
+
   /**
    * Returns a scm container client.
    *
@@ -859,11 +904,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private static OzoneManager createOm(String[] argv,
       OzoneConfiguration conf, boolean printBanner)
       throws IOException, AuthenticationException {
-    if (!isHddsEnabled(conf)) {
-      System.err.println("OM cannot be started in secure mode or when " +
-          OZONE_ENABLED + " is set to false");
-      System.exit(1);
-    }
     StartupOption startOpt = parseArguments(argv);
     if (startOpt == null) {
       printUsage(System.err);
@@ -928,6 +968,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
             "OM initialization succeeded.Current cluster id for sd="
                 + omStorage.getStorageDir() + ";cid=" + omStorage
                 .getClusterID());
+
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          initializeSecurity(conf, omStorage);
+        }
+
         return true;
       } catch (IOException ioe) {
         LOG.error("Could not initialize OM version file", ioe);
@@ -942,6 +987,41 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Initializes secure OzoneManager.
+   * */
+  @VisibleForTesting
+  public static void initializeSecurity(OzoneConfiguration conf,
+      OMStorage omStore)
+      throws IOException {
+    LOG.info("Initializing secure OzoneManager.");
+
+    CertificateClient certClient =
+        new OMCertificateClient(new SecurityConfig(conf));
+    CertificateClient.InitResponse response = certClient.init();
+    LOG.info("Init response: {}", response);
+    switch (response) {
+    case SUCCESS:
+      LOG.info("Initialization successful.");
+      break;
+    case GETCERT:
+      getSCMSignedCert(certClient, conf, omStore);
+      LOG.info("Successfully stored SCM signed certificate.");
+      break;
+    case FAILURE:
+      LOG.error("OM security initialization failed.");
+      throw new RuntimeException("OM security initialization failed.");
+    case RECOVER:
+      LOG.error("OM security initialization failed. OM certificate is " +
+          "missing.");
+      throw new RuntimeException("OM security initialization failed.");
+    default:
+      LOG.error("OM security initialization failed. Init response: {}",
+          response);
+      throw new RuntimeException("OM security initialization failed.");
+    }
+  }
+
   private static ScmInfo getScmInfo(OzoneConfiguration conf)
       throws IOException {
     try {
@@ -1266,6 +1346,65 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Get SCM signed certificate and store it using certificate client.
+   * */
+  private static void getSCMSignedCert(CertificateClient client,
+      OzoneConfiguration config, OMStorage omStore) throws IOException {
+    CertificateSignRequest.Builder builder = client.getCSRBuilder();
+    KeyPair keyPair = new KeyPair(client.getPublicKey(),
+        client.getPrivateKey());
+    InetSocketAddress omRpcAdd;
+
+    omRpcAdd = OmUtils.getOmAddress(config);
+    // Get host name.
+    String hostname = omRpcAdd.getAddress().getHostName();
+
+    String subject = UserGroupInformation.getCurrentUser()
+        .getShortUserName() + "@" + hostname;
+
+    builder.setCA(false)
+        .setKey(keyPair)
+        .setConfiguration(config)
+        .setScmID(omStore.getScmId())
+        .setClusterID(omStore.getClusterID())
+        .setSubject(subject)
+        .addIpAddress(omRpcAdd.getAddress().getHostAddress());
+
+    LOG.info("Creating csr for OM->dns:{},ip:{},scmId:{},clusterId:{}," +
+            "subject:{}", hostname, omRpcAdd.getAddress().getHostAddress(),
+        omStore.getScmId(), omStore.getClusterID(), subject);
+
+    HddsProtos.OzoneManagerDetailsProto.Builder omDetailsProtoBuilder =
+        HddsProtos.OzoneManagerDetailsProto.newBuilder()
+            .setHostName(omRpcAdd.getHostName())
+            .setIpAddress(omRpcAdd.getAddress().getHostAddress())
+            .setUuid(omStore.getOmId())
+            .addPorts(HddsProtos.Port.newBuilder()
+                .setName(RPC_PORT)
+                .setValue(omRpcAdd.getPort())
+                .build());
+
+    PKCS10CertificationRequest csr = builder.build();
+    HddsProtos.OzoneManagerDetailsProto omDetailsProto =
+        omDetailsProtoBuilder.build();
+    LOG.info("OzoneManager ports added:{}", omDetailsProto.getPortsList());
+    SCMSecurityProtocol secureScmClient = getScmSecurityClient(config);
+
+    String pemEncodedCert = secureScmClient.getOMCertificate(omDetailsProto,
+        getEncodedString(csr));
+
+    try {
+      X509Certificate x509Certificate =
+          CertificateCodec.getX509Cert(pemEncodedCert);
+      client.storeCertificate(x509Certificate);
+    } catch (IOException | CertificateException e) {
+      LOG.error("Error while storing SCM signed certificate.", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
   /**
    *
    * @return true if delegation token operation is allowed
@@ -2469,4 +2608,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public OMFailoverProxyProvider getOMFailoverProxyProvider() {
     return null;
   }
+
+  @VisibleForTesting
+  public CertificateClient getCertificateClient() {
+    return certClient;
+  }
+
+  public String getComponent() {
+    return omComponent;
+  }
 }