|
@@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
|
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
|
|
+import org.apache.hadoop.ozone.common.StorageInfo;
|
|
|
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
|
@@ -82,6 +83,8 @@ import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
|
|
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
|
|
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
|
|
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
|
|
+import org.apache.hadoop.ozone.common.Storage.StorageState;
|
|
|
+import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
@@ -89,6 +92,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import javax.management.ObjectName;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.PrintStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -100,7 +104,6 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
-import java.util.UUID;
|
|
|
import java.util.Collections;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -138,12 +141,45 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(StorageContainerManager.class);
|
|
|
|
|
|
+ /**
|
|
|
+ * Startup options.
|
|
|
+ */
|
|
|
+ public enum StartupOption {
|
|
|
+ INIT("-init"),
|
|
|
+ CLUSTERID("-clusterid"),
|
|
|
+ GENCLUSTERID("-genclusterid"),
|
|
|
+ REGULAR("-regular"),
|
|
|
+ HELP("-help");
|
|
|
+
|
|
|
+ private final String name;
|
|
|
+ private String clusterId = null;
|
|
|
+
|
|
|
+ public void setClusterId(String cid) {
|
|
|
+ if(cid != null && !cid.isEmpty()) {
|
|
|
+ clusterId = cid;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getClusterId() {
|
|
|
+ return clusterId;
|
|
|
+ }
|
|
|
+
|
|
|
+ StartupOption(String arg) {
|
|
|
+ this.name = arg;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getName() {
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* NodeManager and container Managers for SCM.
|
|
|
*/
|
|
|
private final NodeManager scmNodeManager;
|
|
|
private final Mapping scmContainerManager;
|
|
|
private final BlockManager scmBlockManager;
|
|
|
+ private final SCMStorage scmStorage;
|
|
|
|
|
|
/** The RPC server that listens to requests from DataNodes. */
|
|
|
private final RPC.Server datanodeRpcServer;
|
|
@@ -169,13 +205,18 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
/** SCM metrics. */
|
|
|
private static SCMMetrics metrics;
|
|
|
|
|
|
+ private static final String USAGE =
|
|
|
+ "Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ "
|
|
|
+ + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n " + "hdfs scm [ "
|
|
|
+ + StartupOption.GENCLUSTERID.getName() + " ]\n " + "hdfs scm [ "
|
|
|
+ + StartupOption.HELP.getName() + " ]\n";
|
|
|
/**
|
|
|
* Creates a new StorageContainerManager. Configuration will be updated with
|
|
|
* information on the actual listening addresses used for RPC servers.
|
|
|
*
|
|
|
* @param conf configuration
|
|
|
*/
|
|
|
- public StorageContainerManager(OzoneConfiguration conf)
|
|
|
+ private StorageContainerManager(OzoneConfiguration conf)
|
|
|
throws IOException {
|
|
|
|
|
|
final int handlerCount = conf.getInt(
|
|
@@ -184,8 +225,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
|
|
|
|
|
StorageContainerManager.initMetrics();
|
|
|
- // TODO : Fix the ClusterID generation code.
|
|
|
- scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
|
|
|
+ scmStorage = new SCMStorage(conf);
|
|
|
+ String clusterId = scmStorage.getClusterID();
|
|
|
+ if (clusterId == null) {
|
|
|
+ throw new SCMException("clusterId not found",
|
|
|
+ ResultCodes.SCM_NOT_INITIALIZED);
|
|
|
+ }
|
|
|
+ scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID());
|
|
|
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
|
|
|
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
|
|
|
scmContainerManager, cacheSize);
|
|
@@ -321,22 +367,125 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|
|
public static void main(String[] argv) throws IOException {
|
|
|
StringUtils.startupShutdownMessage(StorageContainerManager.class,
|
|
|
argv, LOG);
|
|
|
+ OzoneConfiguration conf = new OzoneConfiguration();
|
|
|
try {
|
|
|
- OzoneConfiguration conf = new OzoneConfiguration();
|
|
|
- if (!DFSUtil.isOzoneEnabled(conf)) {
|
|
|
- System.out.println("SCM cannot be started in secure mode or when " +
|
|
|
- OZONE_ENABLED + " is set to false");
|
|
|
- System.exit(1);
|
|
|
+ StorageContainerManager scm = createSCM(argv, conf);
|
|
|
+ if (scm != null) {
|
|
|
+ scm.start();
|
|
|
+ scm.join();
|
|
|
}
|
|
|
- StorageContainerManager scm = new StorageContainerManager(conf);
|
|
|
- scm.start();
|
|
|
- scm.join();
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("Failed to start the StorageContainerManager.", t);
|
|
|
terminate(1, t);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static void printUsage(PrintStream out) {
|
|
|
+ out.println(USAGE + "\n");
|
|
|
+ }
|
|
|
+
|
|
|
+ public static StorageContainerManager createSCM(String[] argv,
|
|
|
+ OzoneConfiguration conf) throws IOException {
|
|
|
+ if (!DFSUtil.isOzoneEnabled(conf)) {
|
|
|
+ System.err.println("SCM cannot be started in secure mode or when " +
|
|
|
+ OZONE_ENABLED + " is set to false");
|
|
|
+ System.exit(1);
|
|
|
+ }
|
|
|
+ StartupOption startOpt = parseArguments(argv);
|
|
|
+ if (startOpt == null) {
|
|
|
+ printUsage(System.err);
|
|
|
+ terminate(1);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ switch (startOpt) {
|
|
|
+ case INIT:
|
|
|
+ terminate(scmInit(conf) ? 0 : 1);
|
|
|
+ return null;
|
|
|
+ case GENCLUSTERID:
|
|
|
+ System.out.println("Generating new cluster id:");
|
|
|
+ System.out.println(StorageInfo.newClusterID());
|
|
|
+ terminate(0);
|
|
|
+ return null;
|
|
|
+ case HELP:
|
|
|
+ printUsage(System.err);
|
|
|
+ terminate(0);
|
|
|
+ return null;
|
|
|
+ default:
|
|
|
+ return new StorageContainerManager(conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Routine to set up the Version info for StorageContainerManager.
|
|
|
+ *
|
|
|
+ * @param conf OzoneConfiguration
|
|
|
+ * @return true if SCM initialization is successful, false otherwise.
|
|
|
+ * @throws IOException if init fails due to I/O error
|
|
|
+ */
|
|
|
+ public static boolean scmInit(OzoneConfiguration conf) throws IOException {
|
|
|
+ SCMStorage scmStorage = new SCMStorage(conf);
|
|
|
+ StorageState state = scmStorage.getState();
|
|
|
+ if (state != StorageState.INITIALIZED) {
|
|
|
+ try {
|
|
|
+ String clusterId = StartupOption.INIT.getClusterId();
|
|
|
+ if (clusterId != null && !clusterId.isEmpty()) {
|
|
|
+ scmStorage.setClusterId(clusterId);
|
|
|
+ }
|
|
|
+ scmStorage.initialize();
|
|
|
+ System.out.println("SCM initialization succeeded." +
|
|
|
+ "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
|
|
|
+ + scmStorage.getClusterID());
|
|
|
+ return true;
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.error("Could not initialize SCM version file", ioe);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ System.out.println("SCM already initialized. Reusing existing" +
|
|
|
+ " cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
|
|
|
+ + scmStorage.getClusterID());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static StartupOption parseArguments(String[] args) {
|
|
|
+ int argsLen = (args == null) ? 0 : args.length;
|
|
|
+ StartupOption startOpt = StartupOption.HELP;
|
|
|
+ if (argsLen == 0) {
|
|
|
+ startOpt = StartupOption.REGULAR;
|
|
|
+ }
|
|
|
+ for (int i = 0; i < argsLen; i++) {
|
|
|
+ String cmd = args[i];
|
|
|
+ if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
|
|
|
+ startOpt = StartupOption.INIT;
|
|
|
+ if (argsLen > 3) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (i = i + 1; i < argsLen; i++) {
|
|
|
+ if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
|
|
|
+ i++;
|
|
|
+ if (i < argsLen && !args[i].isEmpty()) {
|
|
|
+ startOpt.setClusterId(args[i]);
|
|
|
+ } else {
|
|
|
+ // if no cluster id specified or is empty string, return null
|
|
|
+ LOG.error("Must specify a valid cluster ID after the "
|
|
|
+ + StartupOption.CLUSTERID.getName() + " flag");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
|
|
|
+ if (argsLen > 1) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ startOpt = StartupOption.GENCLUSTERID;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return startOpt;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Returns a SCMCommandRepose from the SCM Command.
|
|
|
* @param cmd - Cmd
|