|
@@ -33,8 +33,8 @@ import org.apache.hadoop.hdds.HddsUtils;
|
|
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
|
|
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
|
|
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.hdds.scm.block.BlockManager;
|
|
|
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
|
|
|
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
|
|
@@ -68,46 +68,49 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
|
|
|
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
|
|
|
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
|
|
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
|
|
|
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
|
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
|
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
|
|
|
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
|
|
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
|
|
|
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
|
|
|
+import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
|
|
|
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
|
|
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
|
|
import org.apache.hadoop.hdds.server.events.EventQueue;
|
|
|
-import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
|
|
-import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
|
|
import org.apache.hadoop.ozone.common.Storage.StorageState;
|
|
|
import org.apache.hadoop.ozone.common.StorageInfo;
|
|
|
import org.apache.hadoop.ozone.lease.LeaseManager;
|
|
|
+import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
-import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.apache.hadoop.util.GenericOptionsParser;
|
|
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
|
|
- .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import javax.management.ObjectName;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintStream;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
|
|
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
|
|
@@ -192,14 +195,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
|
|
|
private final ReplicationActivityStatus replicationStatus;
|
|
|
private final SCMChillModeManager scmChillModeManager;
|
|
|
+ private final CertificateServer certificateServer;
|
|
|
|
|
|
private JvmPauseMonitor jvmPauseMonitor;
|
|
|
private final OzoneConfiguration configuration;
|
|
|
|
|
|
/**
|
|
|
- * Creates a new StorageContainerManager. Configuration will be updated
|
|
|
- * with information on the
|
|
|
- * actual listening addresses used for RPC servers.
|
|
|
+ * Creates a new StorageContainerManager. Configuration will be
|
|
|
+ * updated with information on the actual listening addresses used
|
|
|
+ * for RPC servers.
|
|
|
*
|
|
|
* @param conf configuration
|
|
|
*/
|
|
@@ -209,17 +213,29 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
configuration = conf;
|
|
|
StorageContainerManager.initMetrics();
|
|
|
initContainerReportCache(conf);
|
|
|
- // Authenticate SCM if security is enabled
|
|
|
- if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
|
|
|
- loginAsSCMUser(conf);
|
|
|
- }
|
|
|
-
|
|
|
scmStorage = new SCMStorage(conf);
|
|
|
if (scmStorage.getState() != StorageState.INITIALIZED) {
|
|
|
throw new SCMException("SCM not initialized.", ResultCodes
|
|
|
.SCM_NOT_INITIALIZED);
|
|
|
}
|
|
|
|
|
|
+ // Authenticate SCM if security is enabled
|
|
|
+ if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
|
|
|
+ loginAsSCMUser(conf);
|
|
|
+ certificateServer = initializeCertificateServer(
|
|
|
+ getScmStorage().getClusterID(), getScmStorage().getScmId());
|
|
|
+ // TODO: Support Intermediary CAs in future.
|
|
|
+ certificateServer.init(new SecurityConfig(conf),
|
|
|
+ CertificateServer.CAType.SELF_SIGNED_CA);
|
|
|
+ } else {
|
|
|
+ // if no Security, we do not create a Certificate Server at all.
|
|
|
+ // This allows user to boot SCM without security temporarily
|
|
|
+ // and then come back and enable it without any impact.
|
|
|
+ certificateServer = null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
eventQueue = new EventQueue();
|
|
|
|
|
|
scmNodeManager = new SCMNodeManager(
|
|
@@ -362,15 +378,33 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
LOG.info("SCM login successful.");
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This function creates/initializes a certificate server as needed.
|
|
|
+ * This function is idempotent, so calling this again and again after the
|
|
|
+ * server is initialized is not a problem.
|
|
|
+ *
|
|
|
+ * @param clusterID - Cluster ID
|
|
|
+ * @param scmID - SCM ID
|
|
|
+ */
|
|
|
+ private CertificateServer initializeCertificateServer(String clusterID,
|
|
|
+ String scmID) throws IOException {
|
|
|
+ // TODO: Support Certificate Server loading via Class Name loader.
|
|
|
+ // So it is easy to use different Certificate Servers if needed.
|
|
|
+ String subject = "scm@" + InetAddress.getLocalHost().getHostName();
|
|
|
+ return new DefaultCAServer(subject, clusterID, scmID);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Builds a message for logging startup information about an RPC server.
|
|
|
*
|
|
|
* @param description RPC server description
|
|
|
- * @param addr RPC server listening address
|
|
|
+ * @param addr RPC server listening address
|
|
|
* @return server startup message
|
|
|
*/
|
|
|
public static String buildRpcServerStartMessage(String description,
|
|
|
- InetSocketAddress addr) {
|
|
|
+ InetSocketAddress addr) {
|
|
|
return addr != null
|
|
|
? String.format("%s is listening at %s", description, addr.toString())
|
|
|
: String.format("%s not started", description);
|
|
@@ -445,7 +479,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
|
|
|
/**
|
|
|
* Create an SCM instance based on the supplied command-line arguments.
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* This method is intended for unit tests only. It suppresses the
|
|
|
* startup/shutdown message and skips registering Unix signal
|
|
|
* handlers.
|
|
@@ -465,8 +499,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
/**
|
|
|
* Create an SCM instance based on the supplied command-line arguments.
|
|
|
*
|
|
|
- * @param args command-line arguments.
|
|
|
- * @param conf HDDS configuration
|
|
|
+ * @param args command-line arguments.
|
|
|
+ * @param conf HDDS configuration
|
|
|
* @param printBanner if true, then log a verbose startup message.
|
|
|
* @return SCM instance
|
|
|
* @throws IOException, AuthenticationException
|
|
@@ -736,7 +770,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " +
|
|
|
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
|
|
|
getDatanodeProtocolServer().start();
|
|
|
- if(getSecurityProtocolServer() != null) {
|
|
|
+ if (getSecurityProtocolServer() != null) {
|
|
|
getSecurityProtocolServer().start();
|
|
|
}
|
|
|
|
|
@@ -853,7 +887,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
getBlockProtocolServer().join();
|
|
|
getClientProtocolServer().join();
|
|
|
getDatanodeProtocolServer().join();
|
|
|
- if(getSecurityProtocolServer() != null) {
|
|
|
+ if (getSecurityProtocolServer() != null) {
|
|
|
getSecurityProtocolServer().join();
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
@@ -987,7 +1021,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
/**
|
|
|
* Returns EventPublisher.
|
|
|
*/
|
|
|
- public EventPublisher getEventQueue(){
|
|
|
+ public EventPublisher getEventQueue() {
|
|
|
return eventQueue;
|
|
|
}
|
|
|
|
|
@@ -1007,7 +1041,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
@Override
|
|
|
public Map<String, Integer> getContainerStateCount() {
|
|
|
Map<String, Integer> nodeStateCount = new HashMap<>();
|
|
|
- for (HddsProtos.LifeCycleState state: HddsProtos.LifeCycleState.values()) {
|
|
|
+ for (HddsProtos.LifeCycleState state : HddsProtos.LifeCycleState.values()) {
|
|
|
nodeStateCount.put(state.toString(), containerManager.getContainers(
|
|
|
state).size());
|
|
|
}
|