Browse Source

HDDS-13. Refactor StorageContainerManager into seperate RPC endpoints. Contributed by Anu Engineer.

Anu Engineer 7 years ago
parent
commit
f0c3dc4cf4
22 changed files with 1716 additions and 1361 deletions
  1. 0 1290
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
  2. 3 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
  3. 222 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
  4. 314 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
  5. 350 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
  6. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
  7. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
  8. 722 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  9. 2 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
  10. 22 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java
  11. 5 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java
  12. 1 1
      hadoop-ozone/common/src/main/bin/ozone
  13. 19 10
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
  14. 17 17
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  15. 5 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  16. 10 10
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
  17. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
  18. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
  19. 3 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
  20. 8 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
  21. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
  22. 8 8
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java

+ 0 - 1290
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java

@@ -1,1290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.container.ContainerMapping;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.Storage.StorageState;
-import org.apache.hadoop.ozone.common.StorageInfo;
-import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocolPB
-    .ScmBlockLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CLIENT_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdds.protocol.proto
-    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
-import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-/**
- * StorageContainerManager is the main entry point for the service that provides
- * information about which SCM nodes host containers.
- *
- * DataNodes report to StorageContainerManager using heartbeat
- * messages. SCM allocates containers and returns a pipeline.
- *
- * A client once it gets a pipeline (a list of datanodes) will connect to the
- * datanodes and create a container, which then can be used to store data.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
-public class StorageContainerManager extends ServiceRuntimeInfoImpl
-    implements StorageContainerDatanodeProtocol,
-    StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StorageContainerManager.class);
-
-  /**
-   *  Startup options.
-   */
-  public enum StartupOption {
-    INIT("-init"),
-    CLUSTERID("-clusterid"),
-    GENCLUSTERID("-genclusterid"),
-    REGULAR("-regular"),
-    HELP("-help");
-
-    private final String name;
-    private String clusterId = null;
-
-    public void setClusterId(String cid) {
-      if(cid != null && !cid.isEmpty()) {
-        clusterId = cid;
-      }
-    }
-
-    public String getClusterId() {
-      return clusterId;
-    }
-
-    StartupOption(String arg) {
-      this.name = arg;
-    }
-
-    public String getName() {
-      return name;
-    }
-  }
-
-  /**
-   * NodeManager and container Managers for SCM.
-   */
-  private final NodeManager scmNodeManager;
-  private final Mapping scmContainerManager;
-  private final BlockManager scmBlockManager;
-  private final SCMStorage scmStorage;
-
-  /** The RPC server that listens to requests from DataNodes. */
-  private final RPC.Server datanodeRpcServer;
-  private final InetSocketAddress datanodeRpcAddress;
-
-  /** The RPC server that listens to requests from clients. */
-  private final RPC.Server clientRpcServer;
-  private final InetSocketAddress clientRpcAddress;
-
-  /** The RPC server that listens to requests from block service clients. */
-  private final RPC.Server blockRpcServer;
-  private final InetSocketAddress blockRpcAddress;
-
-  private final StorageContainerManagerHttpServer httpServer;
-
-  /** SCM mxbean. */
-  private ObjectName scmInfoBeanName;
-
-  /** SCM super user. */
-  private final String scmUsername;
-  private final Collection<String> scmAdminUsernames;
-
-  /** SCM metrics. */
-  private static SCMMetrics metrics;
-  /** Key = DatanodeUuid, value = ContainerStat. */
-  private Cache<String, ContainerStat> containerReportCache;
-
-
-  private static final String USAGE =
-      "Usage: \n ozone scm [genericOptions] "
-          + "[ " + StartupOption.INIT.getName() + " [ "
-          + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n "
-          + "ozone scm [genericOptions] [ "
-          + StartupOption.GENCLUSTERID.getName() + " ]\n " +
-          "ozone scm [ "
-          + StartupOption.HELP.getName() + " ]\n";
-  /**
-   * Creates a new StorageContainerManager.  Configuration will be updated with
-   * information on the actual listening addresses used for RPC servers.
-   *
-   * @param conf configuration
-   */
-  private StorageContainerManager(OzoneConfiguration conf)
-      throws IOException {
-
-    final int handlerCount = conf.getInt(
-        OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
-    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-
-    StorageContainerManager.initMetrics();
-    initContainerReportCache(conf);
-
-    scmStorage = new SCMStorage(conf);
-    if (scmStorage.getState() != StorageState.INITIALIZED) {
-      throw new SCMException("SCM not initialized.",
-          ResultCodes.SCM_NOT_INITIALIZED);
-    }
-    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
-    scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
-    scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
-        scmContainerManager, cacheSize);
-
-    scmAdminUsernames = conf.getTrimmedStringCollection(
-        OzoneConfigKeys.OZONE_ADMINISTRATORS);
-    scmUsername = UserGroupInformation.getCurrentUser().getUserName();
-    if (!scmAdminUsernames.contains(scmUsername)) {
-      scmAdminUsernames.add(scmUsername);
-    }
-
-    RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-
-    BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
-        StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
-        new StorageContainerDatanodeProtocolServerSideTranslatorPB(this));
-
-    final InetSocketAddress datanodeRpcAddr =
-        HddsServerUtil.getScmDataNodeBindAddress(conf);
-    datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
-        StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
-        handlerCount);
-    datanodeRpcAddress = updateRPCListenAddress(conf,
-        OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
-
-    // SCM Container Service RPC
-    BlockingService storageProtoPbService =
-        StorageContainerLocationProtocolProtos
-            .StorageContainerLocationProtocolService
-            .newReflectiveBlockingService(
-            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
-
-    final InetSocketAddress scmAddress =
-        HddsServerUtil.getScmClientBindAddress(conf);
-    clientRpcServer = startRpcServer(conf, scmAddress,
-        StorageContainerLocationProtocolPB.class, storageProtoPbService,
-        handlerCount);
-    clientRpcAddress = updateRPCListenAddress(conf,
-        OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
-
-    // SCM Block Service RPC
-    BlockingService blockProtoPbService =
-        ScmBlockLocationProtocolProtos
-            .ScmBlockLocationProtocolService
-            .newReflectiveBlockingService(
-            new ScmBlockLocationProtocolServerSideTranslatorPB(this));
-
-    final InetSocketAddress scmBlockAddress =
-        HddsServerUtil.getScmBlockClientBindAddress(conf);
-    blockRpcServer = startRpcServer(conf, scmBlockAddress,
-        ScmBlockLocationProtocolPB.class, blockProtoPbService,
-        handlerCount);
-    blockRpcAddress = updateRPCListenAddress(conf,
-        OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
-
-    httpServer = new StorageContainerManagerHttpServer(conf);
-
-    registerMXBean();
-  }
-
-  /**
-   * Initialize container reports cache that sent from datanodes.
-   *
-   * @param conf
-   */
-  private void initContainerReportCache(OzoneConfiguration conf) {
-    containerReportCache = CacheBuilder.newBuilder()
-        .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
-        .maximumSize(Integer.MAX_VALUE)
-        .removalListener(new RemovalListener<String, ContainerStat>() {
-          @Override
-          public void onRemoval(
-              RemovalNotification<String, ContainerStat> removalNotification) {
-            synchronized (containerReportCache) {
-              ContainerStat stat = removalNotification.getValue();
-              // remove invalid container report
-              metrics.decrContainerStat(stat);
-              LOG.debug(
-                  "Remove expired container stat entry for datanode: {}.",
-                  removalNotification.getKey());
-            }
-          }
-        }).build();
-  }
-
-  /**
-   * Builds a message for logging startup information about an RPC server.
-   *
-   * @param description RPC server description
-   * @param addr RPC server listening address
-   * @return server startup message
-   */
-  private static String buildRpcServerStartMessage(String description,
-      InetSocketAddress addr) {
-    return addr != null ? String.format("%s is listening at %s",
-        description, addr.toString()) :
-        String.format("%s not started", description);
-  }
-
-  /**
-   * Starts an RPC server, if configured.
-   *
-   * @param conf configuration
-   * @param addr configured address of RPC server
-   * @param protocol RPC protocol provided by RPC server
-   * @param instance RPC protocol implementation instance
-   * @param handlerCount RPC server handler count
-   *
-   * @return RPC server
-   * @throws IOException if there is an I/O error while creating RPC server
-   */
-  private static RPC.Server startRpcServer(OzoneConfiguration conf,
-      InetSocketAddress addr, Class<?> protocol, BlockingService instance,
-      int handlerCount)
-      throws IOException {
-    RPC.Server rpcServer = new RPC.Builder(conf)
-        .setProtocol(protocol)
-        .setInstance(instance)
-        .setBindAddress(addr.getHostString())
-        .setPort(addr.getPort())
-        .setNumHandlers(handlerCount)
-        .setVerbose(false)
-        .setSecretManager(null)
-        .build();
-
-    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
-    return rpcServer;
-  }
-
-  private void registerMXBean() {
-    Map<String, String> jmxProperties = new HashMap<>();
-    jmxProperties.put("component", "ServerRuntime");
-    this.scmInfoBeanName =
-        MBeans.register("StorageContainerManager",
-            "StorageContainerManagerInfo",
-            jmxProperties,
-            this);
-  }
-
-  private void unregisterMXBean() {
-    if(this.scmInfoBeanName != null) {
-      MBeans.unregister(this.scmInfoBeanName);
-      this.scmInfoBeanName = null;
-    }
-  }
-
-  /**
-   * Main entry point for starting StorageContainerManager.
-   *
-   * @param argv arguments
-   * @throws IOException if startup fails due to I/O error
-   */
-  public static void main(String[] argv) throws IOException {
-    if (DFSUtil.parseHelpArgument(argv, USAGE,
-        System.out, true)) {
-      System.exit(0);
-    }
-    try {
-      OzoneConfiguration conf = new OzoneConfiguration();
-      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
-      if (!hParser.isParseSuccessful()) {
-        System.err.println("USAGE: " + USAGE + "\n");
-        hParser.printGenericCommandUsage(System.err);
-        System.exit(1);
-      }
-      StringUtils.startupShutdownMessage(StorageContainerManager.class,
-          argv, LOG);
-      StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf);
-      if (scm != null) {
-        scm.start();
-        scm.join();
-      }
-    } catch (Throwable t) {
-      LOG.error("Failed to start the StorageContainerManager.", t);
-      terminate(1, t);
-    }
-  }
-
-  private static void printUsage(PrintStream out) {
-    out.println(USAGE + "\n");
-  }
-
-  public static StorageContainerManager createSCM(String[] argv,
-      OzoneConfiguration conf) throws IOException {
-    if (!HddsUtils.isHddsEnabled(conf)) {
-      System.err.println("SCM cannot be started in secure mode or when " +
-          OZONE_ENABLED + " is set to false");
-      System.exit(1);
-    }
-    StartupOption startOpt = parseArguments(argv);
-    if (startOpt == null) {
-      printUsage(System.err);
-      terminate(1);
-      return null;
-    }
-    switch (startOpt) {
-    case INIT:
-      terminate(scmInit(conf) ? 0 : 1);
-      return null;
-    case GENCLUSTERID:
-      System.out.println("Generating new cluster id:");
-      System.out.println(StorageInfo.newClusterID());
-      terminate(0);
-      return null;
-    case HELP:
-      printUsage(System.err);
-      terminate(0);
-      return null;
-    default:
-      return new StorageContainerManager(conf);
-    }
-  }
-
-  /**
-   * Routine to set up the Version info for StorageContainerManager.
-   *
-   * @param conf OzoneConfiguration
-   * @return true if SCM initialization is successful, false otherwise.
-   * @throws IOException if init fails due to I/O error
-   */
-  public static boolean scmInit(OzoneConfiguration conf) throws IOException {
-    SCMStorage scmStorage = new SCMStorage(conf);
-    StorageState state = scmStorage.getState();
-    if (state != StorageState.INITIALIZED) {
-      try {
-        String clusterId = StartupOption.INIT.getClusterId();
-        if (clusterId != null && !clusterId.isEmpty()) {
-          scmStorage.setClusterId(clusterId);
-        }
-        scmStorage.initialize();
-        System.out.println("SCM initialization succeeded." +
-            "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
-                + scmStorage.getClusterID());
-        return true;
-      } catch (IOException ioe) {
-        LOG.error("Could not initialize SCM version file", ioe);
-        return false;
-      }
-    } else {
-      System.out.println("SCM already initialized. Reusing existing" +
-          " cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
-              + scmStorage.getClusterID());
-      return true;
-    }
-  }
-
-  private static StartupOption parseArguments(String[] args) {
-    int argsLen = (args == null) ? 0 : args.length;
-    StartupOption startOpt = StartupOption.HELP;
-    if (argsLen == 0) {
-      startOpt = StartupOption.REGULAR;
-    }
-    for (int i = 0; i < argsLen; i++) {
-      String cmd = args[i];
-      if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
-        startOpt = StartupOption.INIT;
-        if (argsLen > 3) {
-          return null;
-        }
-        for (i = i + 1; i < argsLen; i++) {
-          if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
-            i++;
-            if (i < argsLen && !args[i].isEmpty()) {
-              startOpt.setClusterId(args[i]);
-            } else {
-              // if no cluster id specified or is empty string, return null
-              LOG.error("Must specify a valid cluster ID after the "
-                  + StartupOption.CLUSTERID.getName() + " flag");
-              return null;
-            }
-          } else {
-            return null;
-          }
-        }
-      } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
-        if (argsLen > 1) {
-          return null;
-        }
-        startOpt = StartupOption.GENCLUSTERID;
-      }
-    }
-    return startOpt;
-  }
-
-  /**
-   * Returns a SCMCommandRepose from the SCM Command.
-   * @param cmd - Cmd
-   * @return SCMCommandResponseProto
-   * @throws InvalidProtocolBufferException
-   */
-  @VisibleForTesting
-  public SCMCommandResponseProto getCommandResponse(SCMCommand cmd,
-      final String datanodID)
-      throws IOException {
-    SCMCmdType type = cmd.getType();
-    SCMCommandResponseProto.Builder builder =
-        SCMCommandResponseProto.newBuilder()
-        .setDatanodeUUID(datanodID);
-    switch (type) {
-    case registeredCommand:
-      return builder.setCmdType(SCMCmdType.registeredCommand)
-          .setRegisteredProto(
-              SCMRegisteredCmdResponseProto.getDefaultInstance())
-          .build();
-    case versionCommand:
-      return builder.setCmdType(SCMCmdType.versionCommand)
-          .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
-          .build();
-    case sendContainerReport:
-      return builder.setCmdType(SCMCmdType.sendContainerReport)
-          .setSendReport(SendContainerReportProto.getDefaultInstance())
-          .build();
-    case reregisterCommand:
-      return builder.setCmdType(SCMCmdType.reregisterCommand)
-          .setReregisterProto(SCMReregisterCmdResponseProto
-              .getDefaultInstance())
-          .build();
-    case deleteBlocksCommand:
-      // Once SCM sends out the deletion message, increment the count.
-      // this is done here instead of when SCM receives the ACK, because
-      // DN might not be able to response the ACK for sometime. In case
-      // it times out, SCM needs to re-send the message some more times.
-      List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
-          .stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
-      this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
-      return builder.setCmdType(SCMCmdType.deleteBlocksCommand)
-          .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
-          .build();
-    case closeContainerCommand:
-      return builder.setCmdType(SCMCmdType.closeContainerCommand)
-          .setCloseContainerProto(((CloseContainerCommand)cmd).getProto())
-          .build();
-    default:
-      throw new IllegalArgumentException("Not implemented");
-    }
-  }
-
-  @VisibleForTesting
-  public static SCMRegisteredCmdResponseProto getRegisteredResponse(
-      SCMCommand cmd, SCMNodeAddressList addressList) {
-    Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
-    RegisteredCommand rCmd = (RegisteredCommand) cmd;
-    SCMCmdType type = cmd.getType();
-    if (type != SCMCmdType.registeredCommand) {
-      throw new IllegalArgumentException("Registered command is not well " +
-          "formed. Internal Error.");
-    }
-    return SCMRegisteredCmdResponseProto.newBuilder()
-        //TODO : Fix this later when we have multiple SCM support.
-        //.setAddressList(addressList)
-        .setErrorCode(rCmd.getError())
-        .setClusterID(rCmd.getClusterID())
-        .setDatanodeUUID(rCmd.getDatanodeUUID()).build();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public Pipeline getContainer(String containerName) throws IOException {
-    checkAdminAccess();
-    return scmContainerManager.getContainer(containerName).getPipeline();
-  }
-
-  @VisibleForTesting
-  public ContainerInfo getContainerInfo(String containerName)
-      throws IOException {
-    return scmContainerManager.getContainer(containerName);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(String startName,
-      String prefixName, int count) throws IOException {
-    return scmContainerManager.listContainer(startName, prefixName, count);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteContainer(String containerName) throws IOException {
-    checkAdminAccess();
-    scmContainerManager.deleteContainer(containerName);
-  }
-
-  /**
-   * Queries a list of Node Statuses.
-   *
-   * @param nodeStatuses
-   * @param queryScope
-   * @param poolName @return List of Datanodes.
-   */
-  @Override
-  public HddsProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses,
-      HddsProtos.QueryScope queryScope, String poolName) throws IOException {
-
-    if (queryScope == HddsProtos.QueryScope.POOL) {
-      throw new IllegalArgumentException("Not Supported yet");
-    }
-
-    List<DatanodeDetails> datanodes = queryNode(nodeStatuses);
-    HddsProtos.NodePool.Builder poolBuilder =
-        HddsProtos.NodePool.newBuilder();
-
-    for (DatanodeDetails datanode : datanodes) {
-      HddsProtos.Node node = HddsProtos.Node.newBuilder()
-          .setNodeID(datanode.getProtoBufMessage())
-          .addAllNodeStates(nodeStatuses)
-          .build();
-      poolBuilder.addNodes(node);
-    }
-
-    return poolBuilder.build();
-  }
-
-  /**
-   * Notify from client when begin/finish operation for container/pipeline
-   * objects on datanodes.
-   * @param type
-   * @param name
-   * @param op
-   * @param stage
-   */
-  @Override
-  public void notifyObjectStageChange(
-      ObjectStageChangeRequestProto.Type type, String name,
-      ObjectStageChangeRequestProto.Op op,
-      ObjectStageChangeRequestProto.Stage stage) throws IOException {
-
-    LOG.info("Object type {} name {} op {} new stage {}",
-        type, name, op, stage);
-    if (type == ObjectStageChangeRequestProto.Type.container) {
-      if (op == ObjectStageChangeRequestProto.Op.create) {
-        if (stage == ObjectStageChangeRequestProto.Stage.begin) {
-          scmContainerManager.updateContainerState(name,
-              HddsProtos.LifeCycleEvent.CREATE);
-        } else {
-          scmContainerManager.updateContainerState(name,
-              HddsProtos.LifeCycleEvent.CREATED);
-        }
-      } else if (op == ObjectStageChangeRequestProto.Op.close) {
-        if (stage == ObjectStageChangeRequestProto.Stage.begin) {
-          scmContainerManager.updateContainerState(name,
-              HddsProtos.LifeCycleEvent.FINALIZE);
-        } else {
-          scmContainerManager.updateContainerState(name,
-              HddsProtos.LifeCycleEvent.CLOSE);
-        }
-      }
-    } //else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
-    // TODO: pipeline state update will be addressed in future patch.
-    //}
-  }
-
-  /**
-   * Creates a replication pipeline of a specified type.
-   */
-  @Override
-  public Pipeline createReplicationPipeline(
-      HddsProtos.ReplicationType replicationType,
-      HddsProtos.ReplicationFactor factor,
-      HddsProtos.NodePool nodePool)
-      throws IOException {
-     // TODO: will be addressed in future patch.
-    return null;
-  }
-
-  /**
-   * Queries a list of Node that match a set of statuses.
-   * <p>
-   * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
-   * then this call will return all healthy nodes which members in
-   * Raft pipeline.
-   * <p>
-   * Right now we don't support operations, so we assume it is an AND operation
-   * between the operators.
-   *
-   * @param nodeStatuses - A set of NodeStates.
-   * @return List of Datanodes.
-   */
-
-  public List<DatanodeDetails> queryNode(EnumSet<NodeState> nodeStatuses) {
-    Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
-    Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
-        "in the query set");
-    List<DatanodeDetails> resultList = new LinkedList<>();
-    Set<DatanodeDetails> currentSet = new TreeSet<>();
-
-    for (NodeState nodeState : nodeStatuses) {
-      Set<DatanodeDetails> nextSet = queryNodeState(nodeState);
-      if ((nextSet == null) || (nextSet.size() == 0)) {
-        // Right now we only support AND operation. So intersect with
-        // any empty set is null.
-        return resultList;
-      }
-      // First time we have to add all the elements, next time we have to
-      // do an intersection operation on the set.
-      if (currentSet.size() == 0) {
-        currentSet.addAll(nextSet);
-      } else {
-        currentSet.retainAll(nextSet);
-      }
-    }
-
-    resultList.addAll(currentSet);
-    return resultList;
-  }
-
-  /**
-   * Query the System for Nodes.
-   *
-   * @param nodeState - NodeState that we are interested in matching.
-   * @return Set of Datanodes that match the NodeState.
-   */
-  private Set<DatanodeDetails> queryNodeState(NodeState nodeState) {
-    if (nodeState == NodeState.RAFT_MEMBER ||
-        nodeState == NodeState.FREE_NODE) {
-      throw new IllegalStateException("Not implemented yet");
-    }
-    Set<DatanodeDetails> returnSet = new TreeSet<>();
-    List<DatanodeDetails> tmp = getScmNodeManager().getNodes(nodeState);
-    if ((tmp != null) && (tmp.size() > 0)) {
-      returnSet.addAll(tmp);
-    }
-    return returnSet;
-  }
-
-  /**
-   * Asks SCM where a container should be allocated. SCM responds with the set
-   * of datanodes that should be used creating this container.
-   *
-   * @param containerName - Name of the container.
-   * @param replicationFactor - replication factor.
-   * @return pipeline
-   * @throws IOException
-   */
-  @Override
-  public Pipeline allocateContainer(HddsProtos.ReplicationType replicationType,
-      HddsProtos.ReplicationFactor replicationFactor, String containerName,
-      String owner) throws IOException {
-
-    checkAdminAccess();
-    return scmContainerManager
-        .allocateContainer(replicationType, replicationFactor, containerName,
-            owner).getPipeline();
-  }
-
-  /**
-   * Returns listening address of StorageLocation Protocol RPC server.
-   *
-   * @return listen address of StorageLocation RPC server
-   */
-  @VisibleForTesting
-  public InetSocketAddress getClientRpcAddress() {
-    return clientRpcAddress;
-  }
-
-  @Override
-  public String getClientRpcPort() {
-    InetSocketAddress addr = getClientRpcAddress();
-    return addr == null ? "0" : Integer.toString(addr.getPort());
-  }
-
-  /**
-   * Returns listening address of StorageDatanode Protocol RPC server.
-   *
-   * @return Address where datanode are communicating.
-   */
-  public InetSocketAddress getDatanodeRpcAddress() {
-    return datanodeRpcAddress;
-  }
-
-  @Override
-  public String getDatanodeRpcPort() {
-    InetSocketAddress addr = getDatanodeRpcAddress();
-    return addr == null ? "0" : Integer.toString(addr.getPort());
-  }
-
-  /**
-   * Start service.
-   */
-  public void start() throws IOException {
-    LOG.info(buildRpcServerStartMessage(
-        "StorageContainerLocationProtocol RPC server", clientRpcAddress));
-    DefaultMetricsSystem.initialize("StorageContainerManager");
-    clientRpcServer.start();
-    LOG.info(buildRpcServerStartMessage(
-        "ScmBlockLocationProtocol RPC server", blockRpcAddress));
-    blockRpcServer.start();
-    LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
-        datanodeRpcAddress));
-    datanodeRpcServer.start();
-    httpServer.start();
-    scmBlockManager.start();
-
-    setStartTime();
-
-  }
-
-  /**
-   * Stop service.
-   */
-  public void stop() {
-    try {
-      LOG.info("Stopping block service RPC server");
-      blockRpcServer.stop();
-    } catch (Exception ex) {
-      LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
-    }
-
-    try {
-      LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
-      clientRpcServer.stop();
-    } catch (Exception ex) {
-      LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
-    }
-
-    try {
-      LOG.info("Stopping the RPC server for DataNodes");
-      datanodeRpcServer.stop();
-    } catch (Exception ex) {
-      LOG.error("Storage Container Manager datanodeRpcServer stop failed.", ex);
-    }
-
-    try {
-      LOG.info("Stopping Storage Container Manager HTTP server.");
-      httpServer.stop();
-    } catch (Exception ex) {
-      LOG.error("Storage Container Manager HTTP server stop failed.", ex);
-    }
-
-    try {
-      LOG.info("Stopping Block Manager Service.");
-      scmBlockManager.stop();
-    } catch (Exception ex) {
-      LOG.error("SCM block manager service stop failed.", ex);
-    }
-
-    if (containerReportCache != null) {
-      containerReportCache.invalidateAll();
-      containerReportCache.cleanUp();
-    }
-
-    if (metrics != null) {
-      metrics.unRegister();
-    }
-
-    unregisterMXBean();
-    IOUtils.cleanupWithLogger(LOG, scmContainerManager);
-    IOUtils.cleanupWithLogger(LOG, scmNodeManager);
-  }
-
-  /**
-   * Wait until service has completed shutdown.
-   */
-  public void join() {
-    try {
-      blockRpcServer.join();
-      clientRpcServer.join();
-      datanodeRpcServer.join();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.info("Interrupted during StorageContainerManager join.");
-    }
-  }
-
-  /**
-   * Returns SCM version.
-   *
-   * @return Version info.
-   */
-  @Override
-  public SCMVersionResponseProto getVersion(
-      SCMVersionRequestProto versionRequest) throws IOException {
-    return getScmNodeManager().getVersion(versionRequest).getProtobufMessage();
-  }
-
-  /**
-   * Used by data node to send a Heartbeat.
-   *
-   * @param datanodeDetails - Datanode Details.
-   * @param nodeReport - Node Report
-   * @param reportState - Container report ready info.
-   * @return - SCMHeartbeatResponseProto
-   * @throws IOException
-   */
-  @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(
-      DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
-      ReportState reportState) throws IOException {
-    List<SCMCommand> commands =
-        getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
-            reportState);
-    List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
-    for (SCMCommand cmd : commands) {
-      cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()
-          .toString()));
-    }
-    return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
-        .build();
-  }
-
-  /**
-   * Register Datanode.
-   *
-   * @param datanodeDetails - DatanodID.
-   * @param scmAddresses - List of SCMs this datanode is configured to
-   * communicate.
-   * @return SCM Command.
-   */
-  @Override
-  public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
-      register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) {
-    // TODO : Return the list of Nodes that forms the SCM HA.
-    return getRegisteredResponse(
-        scmNodeManager.register(datanodeDetails), null);
-  }
-
-  /**
-   * Send a container report.
-   *
-   * @param reports -- Container report
-   * @return HeartbeatRespose.nullcommand.
-   * @throws IOException
-   */
-  @Override
-  public ContainerReportsResponseProto sendContainerReport(
-      ContainerReportsRequestProto reports) throws IOException {
-    updateContainerReportMetrics(reports);
-
-    // should we process container reports async?
-    scmContainerManager.processContainerReports(reports);
-    return ContainerReportsResponseProto.newBuilder().build();
-  }
-
-  private void updateContainerReportMetrics(
-      ContainerReportsRequestProto reports) {
-    ContainerStat newStat = null;
-    // TODO: We should update the logic once incremental container report
-    // type is supported.
-    if (reports
-        .getType() == ContainerReportsRequestProto.reportType.fullReport) {
-      newStat = new ContainerStat();
-      for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
-          .getReportsList()) {
-        newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
-            info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
-            info.getReadCount(), info.getWriteCount()));
-      }
-
-      // update container metrics
-      metrics.setLastContainerStat(newStat);
-    }
-
-    // Update container stat entry, this will trigger a removal operation if it
-    // exists in cache.
-    synchronized (containerReportCache) {
-      String datanodeUuid = reports.getDatanodeDetails().getUuid();
-      if (datanodeUuid != null && newStat != null) {
-        containerReportCache.put(datanodeUuid, newStat);
-        // update global view container metrics
-        metrics.incrContainerStat(newStat);
-      }
-    }
-  }
-
-  /**
-   * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
-   * SCM considers the blocks are deleted and update the metadata in SCM DB.
-   *
-   * @param acks
-   * @return
-   * @throws IOException
-   */
-  @Override
-  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
-      ContainerBlocksDeletionACKProto acks) throws IOException {
-    if (acks.getResultsCount() > 0) {
-      List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
-      for (DeleteBlockTransactionResult result : resultList) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
-                  + "success={}", result.getTxID(), result.getSuccess());
-        }
-        if (result.getSuccess()) {
-          LOG.debug("Purging TXID={} from block deletion log",
-              result.getTxID());
-          this.getScmBlockManager().getDeletedBlockLog()
-              .commitTransactions(Collections.singletonList(result.getTxID()));
-        } else {
-          LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
-              + "TX in next interval", result.getTxID());
-        }
-      }
-    }
-    return ContainerBlocksDeletionACKResponseProto.newBuilder()
-        .getDefaultInstanceForType();
-  }
-
-  /**
-   * Returns the Number of Datanodes that are communicating with SCM.
-   *
-   * @param nodestate Healthy, Dead etc.
-   * @return int -- count
-   */
-  public int getNodeCount(NodeState nodestate) {
-    return scmNodeManager.getNodeCount(nodestate);
-  }
-
-  /**
-   * Returns SCM container manager.
-   */
-  @VisibleForTesting
-  public Mapping getScmContainerManager() {
-    return scmContainerManager;
-  }
-
-  /**
-   * Returns node manager.
-   * @return - Node Manager
-   */
-  @VisibleForTesting
-  public NodeManager getScmNodeManager() {
-    return scmNodeManager;
-  }
-
-  @VisibleForTesting
-  public BlockManager getScmBlockManager() {
-    return scmBlockManager;
-  }
-
-  /**
-   * Get block locations.
-   * @param keys batch of block keys to retrieve.
-   * @return set of allocated blocks.
-   * @throws IOException
-   */
-  @Override
-  public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
-      throws IOException {
-    Set<AllocatedBlock> locatedBlocks = new HashSet<>();
-    for (String key: keys) {
-      Pipeline pipeline = scmBlockManager.getBlock(key);
-      AllocatedBlock block = new AllocatedBlock.Builder()
-          .setKey(key)
-          .setPipeline(pipeline).build();
-      locatedBlocks.add(block);
-    }
-    return locatedBlocks;
-  }
-
-  /**
-   * Asks SCM where a block should be allocated. SCM responds with the set of
-   * datanodes that should be used creating this block.
-   *
-   * @param size - size of the block.
-   * @param type - Replication type.
-   * @param factor
-   * @return allocated block accessing info (key, pipeline).
-   * @throws IOException
-   */
-  @Override
-  public AllocatedBlock allocateBlock(long size,
-      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
-      String owner) throws IOException {
-    return scmBlockManager.allocateBlock(size, type, factor, owner);
-  }
-
-  /**
-   * Get the clusterId and SCM Id from the version file in SCM.
-   */
-  @Override
-  public ScmInfo getScmInfo() throws IOException {
-    ScmInfo.Builder builder = new ScmInfo.Builder()
-        .setClusterId(scmStorage.getClusterID())
-        .setScmId(scmStorage.getScmId());
-    return builder.build();
-  }
-  /**
-   * Delete blocks for a set of object keys.
-   *
-   * @param keyBlocksInfoList list of block keys with object keys to delete.
-   * @return deletion results.
-   */
-  public List<DeleteBlockGroupResult> deleteKeyBlocks(
-      List<BlockGroup> keyBlocksInfoList) throws IOException {
-    LOG.info("SCM is informed by KSM to delete {} blocks",
-        keyBlocksInfoList.size());
-    List<DeleteBlockGroupResult> results = new ArrayList<>();
-    for (BlockGroup keyBlocks : keyBlocksInfoList) {
-      Result resultCode;
-      try {
-        // We delete blocks in an atomic operation to prevent getting
-        // into state like only a partial of blocks are deleted,
-        // which will leave key in an inconsistent state.
-        scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
-        resultCode = Result.success;
-      } catch (SCMException scmEx) {
-        LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
-        switch (scmEx.getResult()) {
-        case CHILL_MODE_EXCEPTION:
-          resultCode = Result.chillMode;
-          break;
-        case FAILED_TO_FIND_BLOCK:
-          resultCode = Result.errorNotFound;
-          break;
-        default:
-          resultCode = Result.unknownFailure;
-        }
-      } catch (IOException ex) {
-        LOG.warn("Fail to delete blocks for object key: {}",
-            keyBlocks.getGroupID(), ex);
-        resultCode = Result.unknownFailure;
-      }
-      List<DeleteBlockResult> blockResultList = new ArrayList<>();
-      for (String blockKey : keyBlocks.getBlockIDList()) {
-        blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
-      }
-      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
-          blockResultList));
-    }
-    return results;
-  }
-
-  @VisibleForTesting
-  public String getPpcRemoteUsername() {
-    UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
-    return user == null ? null : user.getUserName();
-  }
-
-  private void checkAdminAccess() throws IOException {
-    String remoteUser = getPpcRemoteUsername();
-    if(remoteUser != null) {
-      if (!scmAdminUsernames.contains(remoteUser)) {
-        throw new IOException(
-            "Access denied for user " + remoteUser
-                + ". Superuser privilege is required.");
-      }
-    }
-  }
-
-  /**
-   * Initialize SCM metrics.
-   */
-  public static void initMetrics() {
-    metrics = SCMMetrics.create();
-  }
-
-  /**
-   * Return SCM metrics instance.
-   */
-  public static SCMMetrics getMetrics() {
-    return metrics == null ? SCMMetrics.create() : metrics;
-  }
-
-  /**
-   * Invalidate container stat entry for given datanode.
-   *
-   * @param datanodeUuid
-   */
-  public void removeContainerReport(String datanodeUuid) {
-    synchronized (containerReportCache) {
-      containerReportCache.invalidate(datanodeUuid);
-    }
-  }
-
-  /**
-   * Get container stat of specified datanode.
-   *
-   * @param datanodeUuid
-   * @return
-   */
-  public ContainerStat getContainerReport(String datanodeUuid) {
-    ContainerStat stat = null;
-    synchronized (containerReportCache) {
-      stat = containerReportCache.getIfPresent(datanodeUuid);
-    }
-
-    return stat;
-  }
-
-  /**
-   * Returns a view of the container stat entries. Modifications made to the
-   * map will directly affect the cache.
-   *
-   * @return
-   */
-  public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
-    return containerReportCache.asMap();
-  }
-
-  @Override
-  public Map<String, String> getContainerReport() {
-    Map<String, String> id2StatMap = new HashMap<>();
-    synchronized (containerReportCache) {
-      ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
-      for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
-        id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
-      }
-    }
-
-    return id2StatMap;
-  }
-}

+ 3 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -849,10 +849,11 @@ public class SCMNodeManager
               .setNodeReport(nodeReport)
               .setContainerReportState(containerReportState)
               .build());
+      return commandQueue.getCommand(datanodeDetails.getUuid());
     } else {
       LOG.error("Datanode ID in heartbeat is null");
     }
-    return commandQueue.getCommand(datanodeDetails.getUuid());
+    return null;
   }
 
   /**

+ 222 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java

@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.protocolPB
+    .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
+    .startRpcServer;
+
+/**
+ * SCM block protocol is the protocol used by Namenode and OzoneManager to get
+ * blocks from the SCM.
+ */
+public class SCMBlockProtocolServer implements ScmBlockLocationProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockProtocolServer.class);
+
+  private final StorageContainerManager scm;
+  private final OzoneConfiguration conf;
+  private final RPC.Server blockRpcServer;
+  private final InetSocketAddress blockRpcAddress;
+
+  /**
+   * The RPC server that listens to requests from block service clients.
+   */
+  public SCMBlockProtocolServer(OzoneConfiguration conf,
+      StorageContainerManager scm) throws IOException {
+    this.scm = scm;
+    this.conf = conf;
+    final int handlerCount =
+        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+            OZONE_SCM_HANDLER_COUNT_DEFAULT);
+
+    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    // SCM Block Service RPC
+    BlockingService blockProtoPbService =
+        ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService
+            .newReflectiveBlockingService(
+                new ScmBlockLocationProtocolServerSideTranslatorPB(this));
+
+    final InetSocketAddress scmBlockAddress = HddsServerUtil
+        .getScmBlockClientBindAddress(conf);
+    blockRpcServer =
+        startRpcServer(
+            conf,
+            scmBlockAddress,
+            ScmBlockLocationProtocolPB.class,
+            blockProtoPbService,
+            handlerCount);
+    blockRpcAddress =
+        updateRPCListenAddress(
+            conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress,
+            blockRpcServer);
+
+  }
+
+  public RPC.Server getBlockRpcServer() {
+    return blockRpcServer;
+  }
+
+  public InetSocketAddress getBlockRpcAddress() {
+    return blockRpcAddress;
+  }
+
+  public void start() {
+    LOG.info(
+        StorageContainerManager.buildRpcServerStartMessage(
+            "RPC server for Block Protocol", getBlockRpcAddress()));
+    getBlockRpcServer().start();
+  }
+
+  public void stop() {
+    try {
+      LOG.info("Stopping the RPC server for Block Protocol");
+      getBlockRpcServer().stop();
+    } catch (Exception ex) {
+      LOG.error("Block Protocol RPC stop failed.", ex);
+    }
+    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+  }
+
+  public void join() throws InterruptedException {
+    LOG.trace("Join RPC server for Block Protocol");
+    getBlockRpcServer().join();
+  }
+
+  @Override
+  public Set<AllocatedBlock> getBlockLocations(Set<String> keys) throws
+      IOException {
+    Set<AllocatedBlock> locatedBlocks = new HashSet<>();
+    for (String key : keys) {
+      Pipeline pipeline = scm.getScmBlockManager().getBlock(key);
+      AllocatedBlock block = new AllocatedBlock.Builder().setKey(key)
+          .setPipeline(pipeline).build();
+      locatedBlocks.add(block);
+    }
+    return locatedBlocks;
+
+  }
+
+  @Override
+  public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
+      type, HddsProtos.ReplicationFactor factor, String owner) throws
+      IOException {
+    return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
+  }
+
+  /**
+   * Delete blocks for a set of object keys.
+   *
+   * @param keyBlocksInfoList list of block keys with object keys to delete.
+   * @return deletion results.
+   */
+  @Override
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException {
+    LOG.info("SCM is informed by KSM to delete {} blocks", keyBlocksInfoList
+        .size());
+    List<DeleteBlockGroupResult> results = new ArrayList<>();
+    for (BlockGroup keyBlocks : keyBlocksInfoList) {
+      ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
+      try {
+        // We delete blocks in an atomic operation to prevent getting
+        // into state like only a partial of blocks are deleted,
+        // which will leave key in an inconsistent state.
+        scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList());
+        resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+            .Result.success;
+      } catch (SCMException scmEx) {
+        LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
+        switch (scmEx.getResult()) {
+        case CHILL_MODE_EXCEPTION:
+          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+              .Result.chillMode;
+          break;
+        case FAILED_TO_FIND_BLOCK:
+          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+              .Result.errorNotFound;
+          break;
+        default:
+          resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+              .Result.unknownFailure;
+        }
+      } catch (IOException ex) {
+        LOG.warn("Fail to delete blocks for object key: {}", keyBlocks
+            .getGroupID(), ex);
+        resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+            .Result.unknownFailure;
+      }
+      List<DeleteBlockResult> blockResultList = new ArrayList<>();
+      for (String blockKey : keyBlocks.getBlockIDList()) {
+        blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
+      }
+      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+          blockResultList));
+    }
+    return results;
+  }
+
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    ScmInfo.Builder builder =
+        new ScmInfo.Builder()
+            .setClusterId(scm.getScmStorage().getClusterID())
+            .setScmId(scm.getScmStorage().getScmId());
+    return builder.build();
+  }
+}

+ 314 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java

@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos
+    .StorageContainerLocationProtocolService.newReflectiveBlockingService;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CLIENT_ADDRESS_KEY;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
+    .startRpcServer;
+
+/**
+ * The RPC server that listens to requests from clients.
+ */
+public class SCMClientProtocolServer implements
+    StorageContainerLocationProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMClientProtocolServer.class);
+  private final RPC.Server clientRpcServer;
+  private final InetSocketAddress clientRpcAddress;
+  private final StorageContainerManager scm;
+  private final OzoneConfiguration conf;
+
+  public SCMClientProtocolServer(OzoneConfiguration conf,
+      StorageContainerManager scm) throws IOException {
+    this.scm = scm;
+    this.conf = conf;
+    final int handlerCount =
+        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+            OZONE_SCM_HANDLER_COUNT_DEFAULT);
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    // SCM Container Service RPC
+    BlockingService storageProtoPbService =
+        newReflectiveBlockingService(
+            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
+
+    final InetSocketAddress scmAddress = HddsServerUtil
+        .getScmClientBindAddress(conf);
+    clientRpcServer =
+        startRpcServer(
+            conf,
+            scmAddress,
+            StorageContainerLocationProtocolPB.class,
+            storageProtoPbService,
+            handlerCount);
+    clientRpcAddress =
+        updateRPCListenAddress(conf, OZONE_SCM_CLIENT_ADDRESS_KEY,
+            scmAddress, clientRpcServer);
+
+  }
+
+  public RPC.Server getClientRpcServer() {
+    return clientRpcServer;
+  }
+
+  public InetSocketAddress getClientRpcAddress() {
+    return clientRpcAddress;
+  }
+
+  public void start() {
+    LOG.info(
+        StorageContainerManager.buildRpcServerStartMessage(
+            "RPC server for Client ", getClientRpcAddress()));
+    getClientRpcServer().start();
+  }
+
+  public void stop() {
+    try {
+      LOG.info("Stopping the RPC server for Client Protocol");
+      getClientRpcServer().stop();
+    } catch (Exception ex) {
+      LOG.error("Client Protocol RPC stop failed.", ex);
+    }
+    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+  }
+
+  public void join() throws InterruptedException {
+    LOG.trace("Join RPC server for Client Protocol");
+    getClientRpcServer().join();
+  }
+
+  @Override
+  public Pipeline allocateContainer(HddsProtos.ReplicationType
+      replicationType, HddsProtos.ReplicationFactor factor, String
+      containerName, String owner) throws IOException {
+    scm.checkAdminAccess();
+    return scm.getScmContainerManager()
+        .allocateContainer(replicationType, factor, containerName, owner)
+        .getPipeline();
+  }
+
+  @Override
+  public Pipeline getContainer(String containerName) throws IOException {
+    return scm.getScmContainerManager()
+        .getContainer(containerName).getPipeline();
+  }
+
+  @Override
+  public List<ContainerInfo> listContainer(String startName,
+      String prefixName, int count) throws IOException {
+    return scm.getScmContainerManager()
+        .listContainer(startName, prefixName, count);
+  }
+
+  @Override
+  public void deleteContainer(String containerName) throws IOException {
+    scm.checkAdminAccess();
+    scm.getScmContainerManager().deleteContainer(containerName);
+
+  }
+
+  @Override
+  public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+      nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws
+      IOException {
+
+    if (queryScope == HddsProtos.QueryScope.POOL) {
+      throw new IllegalArgumentException("Not Supported yet");
+    }
+
+    List<DatanodeDetails> datanodes = queryNode(nodeStatuses);
+    HddsProtos.NodePool.Builder poolBuilder = HddsProtos.NodePool.newBuilder();
+
+    for (DatanodeDetails datanode : datanodes) {
+      HddsProtos.Node node =
+          HddsProtos.Node.newBuilder()
+              .setNodeID(datanode.getProtoBufMessage())
+              .addAllNodeStates(nodeStatuses)
+              .build();
+      poolBuilder.addNodes(node);
+    }
+
+    return poolBuilder.build();
+
+  }
+
+  @Override
+  public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
+      .ObjectStageChangeRequestProto.Type type, String name,
+      StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op
+          op, StorageContainerLocationProtocolProtos
+      .ObjectStageChangeRequestProto.Stage stage) throws IOException {
+
+    LOG.info("Object type {} name {} op {} new stage {}", type, name, op,
+        stage);
+    if (type == StorageContainerLocationProtocolProtos
+        .ObjectStageChangeRequestProto.Type.container) {
+      if (op == StorageContainerLocationProtocolProtos
+          .ObjectStageChangeRequestProto.Op.create) {
+        if (stage == StorageContainerLocationProtocolProtos
+            .ObjectStageChangeRequestProto.Stage.begin) {
+          scm.getScmContainerManager().updateContainerState(name, HddsProtos
+              .LifeCycleEvent.CREATE);
+        } else {
+          scm.getScmContainerManager().updateContainerState(name, HddsProtos
+              .LifeCycleEvent.CREATED);
+        }
+      } else {
+        if (op == StorageContainerLocationProtocolProtos
+            .ObjectStageChangeRequestProto.Op.close) {
+          if (stage == StorageContainerLocationProtocolProtos
+              .ObjectStageChangeRequestProto.Stage.begin) {
+            scm.getScmContainerManager().updateContainerState(name, HddsProtos
+                .LifeCycleEvent.FINALIZE);
+          } else {
+            scm.getScmContainerManager().updateContainerState(name, HddsProtos
+                .LifeCycleEvent.CLOSE);
+          }
+        }
+      }
+    } // else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
+    // TODO: pipeline state update will be addressed in future patch.
+    // }
+
+  }
+
+  @Override
+  public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
+      throws IOException {
+    // TODO: will be addressed in future patch.
+    // This is needed only for debugging purposes to make sure cluster is
+    // working correctly. 
+    return null;
+  }
+
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    ScmInfo.Builder builder =
+        new ScmInfo.Builder()
+            .setClusterId(scm.getScmStorage().getClusterID())
+            .setScmId(scm.getScmStorage().getScmId());
+    return builder.build();
+  }
+
+  /**
+   * Queries a list of Node that match a set of statuses.
+   *
+   * <p>For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, then
+   * this call will return all
+   * healthy nodes which members in Raft pipeline.
+   *
+   * <p>Right now we don't support operations, so we assume it is an AND
+   * operation between the
+   * operators.
+   *
+   * @param nodeStatuses - A set of NodeStates.
+   * @return List of Datanodes.
+   */
+  public List<DatanodeDetails> queryNode(EnumSet<HddsProtos.NodeState>
+      nodeStatuses) {
+    Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
+    Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
+        "in the query set");
+    List<DatanodeDetails> resultList = new LinkedList<>();
+    Set<DatanodeDetails> currentSet = new TreeSet<>();
+
+    for (HddsProtos.NodeState nodeState : nodeStatuses) {
+      Set<DatanodeDetails> nextSet = queryNodeState(nodeState);
+      if ((nextSet == null) || (nextSet.size() == 0)) {
+        // Right now we only support AND operation. So intersect with
+        // any empty set is null.
+        return resultList;
+      }
+      // First time we have to add all the elements, next time we have to
+      // do an intersection operation on the set.
+      if (currentSet.size() == 0) {
+        currentSet.addAll(nextSet);
+      } else {
+        currentSet.retainAll(nextSet);
+      }
+    }
+
+    resultList.addAll(currentSet);
+    return resultList;
+  }
+
+  /**
+   * Query the System for Nodes.
+   *
+   * @param nodeState - NodeState that we are interested in matching.
+   * @return Set of Datanodes that match the NodeState.
+   */
+  private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) {
+    if (nodeState == HddsProtos.NodeState.RAFT_MEMBER || nodeState ==
+        HddsProtos.NodeState
+        .FREE_NODE) {
+      throw new IllegalStateException("Not implemented yet");
+    }
+    Set<DatanodeDetails> returnSet = new TreeSet<>();
+    List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState);
+    if ((tmp != null) && (tmp.size() > 0)) {
+      returnSet.addAll(tmp);
+    }
+    return returnSet;
+  }
+}

+ 350 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java

@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.sendContainerReport;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
+
+
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
+
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+
+/**
+ * Protocol Handler for Datanode Protocol.
+ */
+public class SCMDatanodeProtocolServer implements
+    StorageContainerDatanodeProtocol {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SCMDatanodeProtocolServer.class);
+
+  /**
+   * The RPC server that listens to requests from DataNodes.
+   */
+  private final RPC.Server datanodeRpcServer;
+
+  private final StorageContainerManager scm;
+  private final InetSocketAddress datanodeRpcAddress;
+
+  public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
+      StorageContainerManager scm)  throws IOException {
+
+    Preconditions.checkNotNull(scm, "SCM cannot be null");
+    this.scm = scm;
+    final int handlerCount =
+        conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+            OZONE_SCM_HANDLER_COUNT_DEFAULT);
+
+    RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    BlockingService dnProtoPbService =
+        StorageContainerDatanodeProtocolProtos
+            .StorageContainerDatanodeProtocolService
+            .newReflectiveBlockingService(
+                new StorageContainerDatanodeProtocolServerSideTranslatorPB(
+                    this));
+
+    InetSocketAddress datanodeRpcAddr =
+        HddsServerUtil.getScmDataNodeBindAddress(conf);
+
+    datanodeRpcServer =
+        startRpcServer(
+            conf,
+            datanodeRpcAddr,
+            StorageContainerDatanodeProtocolPB.class,
+            dnProtoPbService,
+            handlerCount);
+
+    datanodeRpcAddress =
+        updateRPCListenAddress(
+            conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
+            datanodeRpcServer);
+  }
+
+  public InetSocketAddress getDatanodeRpcAddress() {
+    return datanodeRpcAddress;
+  }
+
+  public RPC.Server getDatanodeRpcServer() {
+    return datanodeRpcServer;
+  }
+
+  @Override
+  public SCMVersionResponseProto getVersion(SCMVersionRequestProto
+      versionRequest)
+      throws IOException {
+    return scm.getScmNodeManager().getVersion(versionRequest)
+        .getProtobufMessage();
+  }
+
+  @Override
+  public SCMHeartbeatResponseProto sendHeartbeat(
+      HddsProtos.DatanodeDetailsProto datanodeDetails,
+      StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport,
+      StorageContainerDatanodeProtocolProtos.ReportState reportState)
+      throws IOException {
+    List<SCMCommand> commands =
+        scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
+            reportState);
+    List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
+    for (SCMCommand cmd : commands) {
+      cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));
+    }
+    return SCMHeartbeatResponseProto.newBuilder()
+        .addAllCommands(cmdResponses).build();
+  }
+
+  @Override
+  public SCMRegisteredCmdResponseProto register(
+      HddsProtos.DatanodeDetailsProto datanodeDetails, String[] scmAddresses)
+      throws IOException {
+    // TODO : Return the list of Nodes that forms the SCM HA.
+    return getRegisteredResponse(scm.getScmNodeManager()
+        .register(datanodeDetails), null);
+  }
+
+  @VisibleForTesting
+  public static SCMRegisteredCmdResponseProto getRegisteredResponse(
+        SCMCommand cmd,
+        StorageContainerDatanodeProtocolProtos.SCMNodeAddressList addressList) {
+    Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
+    RegisteredCommand rCmd = (RegisteredCommand) cmd;
+    SCMCmdType type = cmd.getType();
+    if (type != SCMCmdType.registeredCommand) {
+      throw new IllegalArgumentException(
+          "Registered command is not well " + "formed. Internal Error.");
+    }
+    return SCMRegisteredCmdResponseProto.newBuilder()
+        // TODO : Fix this later when we have multiple SCM support.
+        // .setAddressList(addressList)
+        .setErrorCode(rCmd.getError())
+        .setClusterID(rCmd.getClusterID())
+        .setDatanodeUUID(rCmd.getDatanodeUUID())
+        .build();
+  }
+
+  @Override
+  public ContainerReportsResponseProto sendContainerReport(
+      ContainerReportsRequestProto reports)
+      throws IOException {
+    updateContainerReportMetrics(reports);
+
+    // should we process container reports async?
+    scm.getScmContainerManager().processContainerReports(reports);
+    return ContainerReportsResponseProto.newBuilder().build();
+  }
+
+  private void updateContainerReportMetrics(
+      ContainerReportsRequestProto reports) {
+    ContainerStat newStat = null;
+    // TODO: We should update the logic once incremental container report
+    // type is supported.
+    if (reports
+        .getType() == StorageContainerDatanodeProtocolProtos
+        .ContainerReportsRequestProto.reportType.fullReport) {
+      newStat = new ContainerStat();
+      for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+          .getReportsList()) {
+        newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+            info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+            info.getReadCount(), info.getWriteCount()));
+      }
+
+      // update container metrics
+      StorageContainerManager.getMetrics().setLastContainerStat(newStat);
+    }
+
+    // Update container stat entry, this will trigger a removal operation if it
+    // exists in cache.
+    synchronized (scm.getContainerReportCache()) {
+      String datanodeUuid = reports.getDatanodeDetails().getUuid();
+      if (datanodeUuid != null && newStat != null) {
+        scm.getContainerReportCache().put(datanodeUuid, newStat);
+        // update global view container metrics
+        StorageContainerManager.getMetrics().incrContainerStat(newStat);
+      }
+    }
+  }
+
+
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto acks) throws IOException {
+    if (acks.getResultsCount() > 0) {
+      List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
+      for (DeleteBlockTransactionResult result : resultList) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+              + "success={}", result.getTxID(), result.getSuccess());
+        }
+        if (result.getSuccess()) {
+          LOG.debug("Purging TXID={} from block deletion log",
+              result.getTxID());
+          scm.getScmBlockManager().getDeletedBlockLog()
+              .commitTransactions(Collections.singletonList(result.getTxID()));
+        } else {
+          LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+              + "TX in next interval", result.getTxID());
+        }
+      }
+    }
+    return ContainerBlocksDeletionACKResponseProto.newBuilder()
+        .getDefaultInstanceForType();
+  }
+
+  public void start() {
+    LOG.info(
+        StorageContainerManager.buildRpcServerStartMessage(
+            "RPC server for DataNodes", getDatanodeRpcAddress()));
+    getDatanodeRpcServer().start();
+  }
+
+  public void stop() {
+    try {
+      LOG.info("Stopping the RPC server for DataNodes");
+      datanodeRpcServer.stop();
+    } catch (Exception ex) {
+      LOG.error(" datanodeRpcServer stop failed.", ex);
+    }
+    IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+  }
+
+  public void join() throws InterruptedException {
+    LOG.trace("Join RPC server for DataNodes");
+    datanodeRpcServer.join();
+  }
+
+  /**
+   * Returns a SCMCommandRepose from the SCM Command.
+   *
+   * @param cmd - Cmd
+   * @return SCMCommandResponseProto
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
+      getCommandResponse(
+      SCMCommand cmd, final String datanodeID) throws IOException {
+    SCMCmdType type = cmd.getType();
+    SCMCommandResponseProto.Builder builder =
+        SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID);
+    switch (type) {
+    case registeredCommand:
+      return builder
+          .setCmdType(registeredCommand)
+          .setRegisteredProto(SCMRegisteredCmdResponseProto
+              .getDefaultInstance())
+          .build();
+    case versionCommand:
+      return builder
+          .setCmdType(versionCommand)
+          .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
+          .build();
+    case sendContainerReport:
+      return builder
+          .setCmdType(sendContainerReport)
+          .setSendReport(SendContainerReportProto.getDefaultInstance())
+          .build();
+    case reregisterCommand:
+      return builder
+          .setCmdType(reregisterCommand)
+          .setReregisterProto(SCMReregisterCmdResponseProto
+              .getDefaultInstance())
+          .build();
+    case deleteBlocksCommand:
+      // Once SCM sends out the deletion message, increment the count.
+      // this is done here instead of when SCM receives the ACK, because
+      // DN might not be able to response the ACK for sometime. In case
+      // it times out, SCM needs to re-send the message some more times.
+      List<Long> txs =
+          ((DeleteBlocksCommand) cmd)
+              .blocksTobeDeleted()
+              .stream()
+              .map(tx -> tx.getTxID())
+              .collect(Collectors.toList());
+      scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
+      return builder
+          .setCmdType(deleteBlocksCommand)
+          .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+          .build();
+    case closeContainerCommand:
+      return builder
+          .setCmdType(closeContainerCommand)
+          .setCloseContainerProto(((CloseContainerCommand) cmd).getProto())
+          .build();
+    default:
+      throw new IllegalArgumentException("Not implemented");
+    }
+  }
+}

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMMXBean.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm;
+package org.apache.hadoop.hdds.scm.server;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMStorage.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds.scm;
+package org.apache.hadoop.hdds.scm.server;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;

+ 722 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+/**
+ * StorageContainerManager is the main entry point for the service that
+ * provides information about
+ * which SCM nodes host containers.
+ *
+ * <p>DataNodes report to StorageContainerManager using heartbeat messages.
+ * SCM allocates containers
+ * and returns a pipeline.
+ *
+ * <p>A client once it gets a pipeline (a list of datanodes) will connect to
+ * the datanodes and
+ * create a container, which then can be used to store data.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
+public final class StorageContainerManager extends ServiceRuntimeInfoImpl
+    implements SCMMXBean {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StorageContainerManager.class);
+  private static final String USAGE =
+      "Usage: \n ozone scm [genericOptions] "
+          + "[ "
+          + StartupOption.INIT.getName()
+          + " [ "
+          + StartupOption.CLUSTERID.getName()
+          + " <cid> ] ]\n "
+          + "ozone scm [genericOptions] [ "
+          + StartupOption.GENCLUSTERID.getName()
+          + " ]\n "
+          + "ozone scm [ "
+          + StartupOption.HELP.getName()
+          + " ]\n";
+  /**
+   * SCM metrics.
+   */
+  private static SCMMetrics metrics;
+
+  /*
+   * RPC Endpoints exposed by SCM.
+   */
+  private final SCMDatanodeProtocolServer datanodeProtocolServer;
+  private final SCMBlockProtocolServer blockProtocolServer;
+  private final SCMClientProtocolServer clientProtocolServer;
+
+  /*
+   * State Managers of SCM.
+   */
+  private final NodeManager scmNodeManager;
+  private final Mapping scmContainerManager;
+  private final BlockManager scmBlockManager;
+  private final SCMStorage scmStorage;
+  /*
+   * HTTP endpoint for JMX access.
+   */
+  private final StorageContainerManagerHttpServer httpServer;
+  /**
+   * SCM super user.
+   */
+  private final String scmUsername;
+  private final Collection<String> scmAdminUsernames;
+  /**
+   * SCM mxbean.
+   */
+  private ObjectName scmInfoBeanName;
+  /**
+   * Key = DatanodeUuid, value = ContainerStat.
+   */
+  private Cache<String, ContainerStat> containerReportCache;
+
+  /**
+   * Creates a new StorageContainerManager. Configuration will be updated
+   * with information on the
+   * actual listening addresses used for RPC servers.
+   *
+   * @param conf configuration
+   */
+  private StorageContainerManager(OzoneConfiguration conf) throws IOException {
+
+    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
+    StorageContainerManager.initMetrics();
+    initContainerReportCache(conf);
+
+    scmStorage = new SCMStorage(conf);
+    if (scmStorage.getState() != StorageState.INITIALIZED) {
+      throw new SCMException("SCM not initialized.", ResultCodes
+          .SCM_NOT_INITIALIZED);
+    }
+
+    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
+    scmContainerManager = new ContainerMapping(conf, getScmNodeManager(),
+        cacheSize);
+
+    scmBlockManager =
+        new BlockManagerImpl(conf, getScmNodeManager(), scmContainerManager,
+            cacheSize);
+
+    scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
+        .OZONE_ADMINISTRATORS);
+    scmUsername = UserGroupInformation.getCurrentUser().getUserName();
+    if (!scmAdminUsernames.contains(scmUsername)) {
+      scmAdminUsernames.add(scmUsername);
+    }
+
+    datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this);
+    blockProtocolServer = new SCMBlockProtocolServer(conf, this);
+    clientProtocolServer = new SCMClientProtocolServer(conf, this);
+    httpServer = new StorageContainerManagerHttpServer(conf);
+
+    registerMXBean();
+  }
+
+  /**
+   * Builds a message for logging startup information about an RPC server.
+   *
+   * @param description RPC server description
+   * @param addr RPC server listening address
+   * @return server startup message
+   */
+  public static String buildRpcServerStartMessage(String description,
+      InetSocketAddress addr) {
+    return addr != null
+        ? String.format("%s is listening at %s", description, addr.toString())
+        : String.format("%s not started", description);
+  }
+
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param addr configured address of RPC server
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param handlerCount RPC server handler count
+   * @return RPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  public static RPC.Server startRpcServer(
+      OzoneConfiguration conf,
+      InetSocketAddress addr,
+      Class<?> protocol,
+      BlockingService instance,
+      int handlerCount)
+      throws IOException {
+    RPC.Server rpcServer =
+        new RPC.Builder(conf)
+            .setProtocol(protocol)
+            .setInstance(instance)
+            .setBindAddress(addr.getHostString())
+            .setPort(addr.getPort())
+            .setNumHandlers(handlerCount)
+            .setVerbose(false)
+            .setSecretManager(null)
+            .build();
+
+    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+    return rpcServer;
+  }
+
+  /**
+   * Main entry point for starting StorageContainerManager.
+   *
+   * @param argv arguments
+   * @throws IOException if startup fails due to I/O error
+   */
+  public static void main(String[] argv) throws IOException {
+    if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
+      System.exit(0);
+    }
+    try {
+      OzoneConfiguration conf = new OzoneConfiguration();
+      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+      if (!hParser.isParseSuccessful()) {
+        System.err.println("USAGE: " + USAGE + "\n");
+        hParser.printGenericCommandUsage(System.err);
+        System.exit(1);
+      }
+      StringUtils.startupShutdownMessage(StorageContainerManager.class, argv,
+          LOG);
+      StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf);
+      if (scm != null) {
+        scm.start();
+        scm.join();
+      }
+    } catch (Throwable t) {
+      LOG.error("Failed to start the StorageContainerManager.", t);
+      terminate(1, t);
+    }
+  }
+
+  private static void printUsage(PrintStream out) {
+    out.println(USAGE + "\n");
+  }
+
+  public static StorageContainerManager createSCM(String[] argv,
+      OzoneConfiguration conf)
+      throws IOException {
+    if (!HddsUtils.isHddsEnabled(conf)) {
+      System.err.println(
+          "SCM cannot be started in secure mode or when " + OZONE_ENABLED + "" +
+              " is set to false");
+      System.exit(1);
+    }
+    StartupOption startOpt = parseArguments(argv);
+    if (startOpt == null) {
+      printUsage(System.err);
+      terminate(1);
+      return null;
+    }
+    switch (startOpt) {
+    case INIT:
+      terminate(scmInit(conf) ? 0 : 1);
+      return null;
+    case GENCLUSTERID:
+      System.out.println("Generating new cluster id:");
+      System.out.println(StorageInfo.newClusterID());
+      terminate(0);
+      return null;
+    case HELP:
+      printUsage(System.err);
+      terminate(0);
+      return null;
+    default:
+      return new StorageContainerManager(conf);
+    }
+  }
+
+  /**
+   * Routine to set up the Version info for StorageContainerManager.
+   *
+   * @param conf OzoneConfiguration
+   * @return true if SCM initialization is successful, false otherwise.
+   * @throws IOException if init fails due to I/O error
+   */
+  public static boolean scmInit(OzoneConfiguration conf) throws IOException {
+    SCMStorage scmStorage = new SCMStorage(conf);
+    StorageState state = scmStorage.getState();
+    if (state != StorageState.INITIALIZED) {
+      try {
+        String clusterId = StartupOption.INIT.getClusterId();
+        if (clusterId != null && !clusterId.isEmpty()) {
+          scmStorage.setClusterId(clusterId);
+        }
+        scmStorage.initialize();
+        System.out.println(
+            "SCM initialization succeeded."
+                + "Current cluster id for sd="
+                + scmStorage.getStorageDir()
+                + ";cid="
+                + scmStorage.getClusterID());
+        return true;
+      } catch (IOException ioe) {
+        LOG.error("Could not initialize SCM version file", ioe);
+        return false;
+      }
+    } else {
+      System.out.println(
+          "SCM already initialized. Reusing existing"
+              + " cluster id for sd="
+              + scmStorage.getStorageDir()
+              + ";cid="
+              + scmStorage.getClusterID());
+      return true;
+    }
+  }
+
+  private static StartupOption parseArguments(String[] args) {
+    int argsLen = (args == null) ? 0 : args.length;
+    StartupOption startOpt = StartupOption.HELP;
+    if (argsLen == 0) {
+      startOpt = StartupOption.REGULAR;
+    }
+    for (int i = 0; i < argsLen; i++) {
+      String cmd = args[i];
+      if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.INIT;
+        if (argsLen > 3) {
+          return null;
+        }
+        for (i = i + 1; i < argsLen; i++) {
+          if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
+            i++;
+            if (i < argsLen && !args[i].isEmpty()) {
+              startOpt.setClusterId(args[i]);
+            } else {
+              // if no cluster id specified or is empty string, return null
+              LOG.error(
+                  "Must specify a valid cluster ID after the "
+                      + StartupOption.CLUSTERID.getName()
+                      + " flag");
+              return null;
+            }
+          } else {
+            return null;
+          }
+        }
+      } else {
+        if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
+          if (argsLen > 1) {
+            return null;
+          }
+          startOpt = StartupOption.GENCLUSTERID;
+        }
+      }
+    }
+    return startOpt;
+  }
+
+  /**
+   * Initialize SCM metrics.
+   */
+  public static void initMetrics() {
+    metrics = SCMMetrics.create();
+  }
+
+  /**
+   * Return SCM metrics instance.
+   */
+  public static SCMMetrics getMetrics() {
+    return metrics == null ? SCMMetrics.create() : metrics;
+  }
+
+  public SCMStorage getScmStorage() {
+    return scmStorage;
+  }
+
+  public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
+    return datanodeProtocolServer;
+  }
+
+  public SCMBlockProtocolServer getBlockProtocolServer() {
+    return blockProtocolServer;
+  }
+
+  public SCMClientProtocolServer getClientProtocolServer() {
+    return clientProtocolServer;
+  }
+
+  /**
+   * Initialize container reports cache that sent from datanodes.
+   *
+   * @param conf
+   */
+  private void initContainerReportCache(OzoneConfiguration conf) {
+    containerReportCache =
+        CacheBuilder.newBuilder()
+            .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+            .maximumSize(Integer.MAX_VALUE)
+            .removalListener(
+                new RemovalListener<String, ContainerStat>() {
+                  @Override
+                  public void onRemoval(
+                      RemovalNotification<String, ContainerStat>
+                          removalNotification) {
+                    synchronized (containerReportCache) {
+                      ContainerStat stat = removalNotification.getValue();
+                      // remove invalid container report
+                      metrics.decrContainerStat(stat);
+                      LOG.debug(
+                          "Remove expired container stat entry for datanode: " +
+                              "{}.",
+                          removalNotification.getKey());
+                    }
+                  }
+                })
+            .build();
+  }
+
+  private void registerMXBean() {
+    Map<String, String> jmxProperties = new HashMap<>();
+    jmxProperties.put("component", "ServerRuntime");
+    this.scmInfoBeanName =
+        MBeans.register(
+            "StorageContainerManager", "StorageContainerManagerInfo",
+            jmxProperties, this);
+  }
+
+  private void unregisterMXBean() {
+    if (this.scmInfoBeanName != null) {
+      MBeans.unregister(this.scmInfoBeanName);
+      this.scmInfoBeanName = null;
+    }
+  }
+
+  @VisibleForTesting
+  public ContainerInfo getContainerInfo(String containerName) throws
+      IOException {
+    return scmContainerManager.getContainer(containerName);
+  }
+
+  /**
+   * Returns listening address of StorageLocation Protocol RPC server.
+   *
+   * @return listen address of StorageLocation RPC server
+   */
+  @VisibleForTesting
+  public InetSocketAddress getClientRpcAddress() {
+    return getClientProtocolServer().getClientRpcAddress();
+  }
+
+  @Override
+  public String getClientRpcPort() {
+    InetSocketAddress addr = getClientRpcAddress();
+    return addr == null ? "0" : Integer.toString(addr.getPort());
+  }
+
+  /**
+   * Returns listening address of StorageDatanode Protocol RPC server.
+   *
+   * @return Address where datanode are communicating.
+   */
+  public InetSocketAddress getDatanodeRpcAddress() {
+    return getDatanodeProtocolServer().getDatanodeRpcAddress();
+  }
+
+  @Override
+  public String getDatanodeRpcPort() {
+    InetSocketAddress addr = getDatanodeRpcAddress();
+    return addr == null ? "0" : Integer.toString(addr.getPort());
+  }
+
+  /**
+   * Start service.
+   */
+  public void start() throws IOException {
+    LOG.info(
+        buildRpcServerStartMessage(
+            "StorageContainerLocationProtocol RPC server",
+            getClientRpcAddress()));
+    DefaultMetricsSystem.initialize("StorageContainerManager");
+    getClientProtocolServer().start();
+
+    LOG.info(buildRpcServerStartMessage("ScmBlockLocationProtocol RPC " +
+        "server", getBlockProtocolServer().getBlockRpcAddress()));
+    getBlockProtocolServer().start();
+
+    LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " +
+        "server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
+    getDatanodeProtocolServer().start();
+
+    httpServer.start();
+    scmBlockManager.start();
+
+    setStartTime();
+  }
+
+  /**
+   * Stop service.
+   */
+  public void stop() {
+
+    try {
+      LOG.info("Stopping datanode service RPC server");
+      getDatanodeProtocolServer().stop();
+
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager datanode RPC stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping block service RPC server");
+      getBlockProtocolServer().stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
+      getClientProtocolServer().stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Storage Container Manager HTTP server.");
+      httpServer.stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager HTTP server stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Block Manager Service.");
+      scmBlockManager.stop();
+    } catch (Exception ex) {
+      LOG.error("SCM block manager service stop failed.", ex);
+    }
+
+    if (containerReportCache != null) {
+      containerReportCache.invalidateAll();
+      containerReportCache.cleanUp();
+    }
+
+    if (metrics != null) {
+      metrics.unRegister();
+    }
+
+    unregisterMXBean();
+    IOUtils.cleanupWithLogger(LOG, scmContainerManager);
+  }
+
+  /**
+   * Wait until service has completed shutdown.
+   */
+  public void join() {
+    try {
+      getBlockProtocolServer().join();
+      getClientProtocolServer().join();
+      getDatanodeProtocolServer().join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted during StorageContainerManager join.");
+    }
+  }
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
+   * @param nodestate Healthy, Dead etc.
+   * @return int -- count
+   */
+  public int getNodeCount(NodeState nodestate) {
+    return scmNodeManager.getNodeCount(nodestate);
+  }
+
+  /**
+   * Returns SCM container manager.
+   */
+  @VisibleForTesting
+  public Mapping getScmContainerManager() {
+    return scmContainerManager;
+  }
+
+  /**
+   * Returns node manager.
+   *
+   * @return - Node Manager
+   */
+  @VisibleForTesting
+  public NodeManager getScmNodeManager() {
+    return scmNodeManager;
+  }
+
+  @VisibleForTesting
+  public BlockManager getScmBlockManager() {
+    return scmBlockManager;
+  }
+
+  @VisibleForTesting
+  public String getPpcRemoteUsername() {
+    UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
+    return user == null ? null : user.getUserName();
+  }
+
+  public void checkAdminAccess() throws IOException {
+    String remoteUser = getPpcRemoteUsername();
+    if (remoteUser != null) {
+      if (!scmAdminUsernames.contains(remoteUser)) {
+        throw new IOException(
+            "Access denied for user " + remoteUser + ". Superuser privilege " +
+                "is required.");
+      }
+    }
+  }
+
+  /**
+   * Invalidate container stat entry for given datanode.
+   *
+   * @param datanodeUuid
+   */
+  public void removeContainerReport(String datanodeUuid) {
+    synchronized (containerReportCache) {
+      containerReportCache.invalidate(datanodeUuid);
+    }
+  }
+
+  /**
+   * Get container stat of specified datanode.
+   *
+   * @param datanodeUuid
+   * @return
+   */
+  public ContainerStat getContainerReport(String datanodeUuid) {
+    ContainerStat stat = null;
+    synchronized (containerReportCache) {
+      stat = containerReportCache.getIfPresent(datanodeUuid);
+    }
+
+    return stat;
+  }
+
+  /**
+   * Returns a view of the container stat entries. Modifications made to the
+   * map will directly
+   * affect the cache.
+   *
+   * @return
+   */
+  public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
+    return containerReportCache.asMap();
+  }
+
+  @Override
+  public Map<String, String> getContainerReport() {
+    Map<String, String> id2StatMap = new HashMap<>();
+    synchronized (containerReportCache) {
+      ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
+      for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
+        id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
+      }
+    }
+
+    return id2StatMap;
+  }
+
+  /**
+   * Startup options.
+   */
+  public enum StartupOption {
+    INIT("-init"),
+    CLUSTERID("-clusterid"),
+    GENCLUSTERID("-genclusterid"),
+    REGULAR("-regular"),
+    HELP("-help");
+
+    private final String name;
+    private String clusterId = null;
+
+    StartupOption(String arg) {
+      this.name = arg;
+    }
+
+    public String getClusterId() {
+      return clusterId;
+    }
+
+    public void setClusterId(String cid) {
+      if (cid != null && !cid.isEmpty()) {
+        clusterId = cid;
+      }
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+}

+ 2 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java → hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java

@@ -15,9 +15,10 @@
  * the License.
  */
 
-package org.apache.hadoop.hdds.scm;
+package org.apache.hadoop.hdds.scm.server;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.server.BaseHttpServer;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 

+ 22 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;

+ 5 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManagerHttpServer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.http.HttpConfig;
@@ -36,7 +37,9 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.util.Arrays;
@@ -95,7 +98,7 @@ public class TestStorageContainerManagerHttpServer {
     conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, policy.name());
     conf.set(ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, "localhost:0");
 
-    InetSocketAddress addr = InetSocketAddress.createUnresolved("localhost", 0);
+    InetSocketAddress.createUnresolved("localhost", 0);
     StorageContainerManagerHttpServer server = null;
     try {
       server = new StorageContainerManagerHttpServer(conf);
@@ -128,7 +131,7 @@ public class TestStorageContainerManagerHttpServer {
       URLConnection conn = connectionFactory.openConnection(url);
       conn.connect();
       conn.getContent();
-    } catch (Exception e) {
+    } catch (IOException e) {
       return false;
     }
     return true;

+ 1 - 1
hadoop-ozone/common/src/main/bin/ozone

@@ -108,7 +108,7 @@ function ozonecmd_case
     ;;
     scm)
       HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
-      HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.StorageContainerManager'
+      HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager'
       hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS"
       HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}"
     ;;

+ 19 - 10
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.junit.After;
@@ -70,7 +70,8 @@ public class TestContainerStateManager {
   public void testAllocateContainer() throws IOException {
     // Allocate a container and verify the container info
     String container1 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container1, containerOwner);
     ContainerInfo info = containerStateManager
         .getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
@@ -87,7 +88,8 @@ public class TestContainerStateManager {
 
     // Check there are two containers in ALLOCATED state after allocation
     String container2 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container2, containerOwner);
     int numContainers = containerStateManager
         .getMatchingContainerIDs(containerOwner,
@@ -101,7 +103,8 @@ public class TestContainerStateManager {
     // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
     String cname = "container" + RandomStringUtils.randomNumeric(5);
     for (int i = 0; i < 10; i++) {
-      scm.allocateContainer(xceiverClientManager.getType(),
+      scm.getClientProtocolServer().allocateContainer(
+          xceiverClientManager.getType(),
           xceiverClientManager.getFactor(), cname + i, containerOwner);
       if (i >= 5) {
         scm.getScmContainerManager()
@@ -128,7 +131,8 @@ public class TestContainerStateManager {
   @Test
   public void testGetMatchingContainer() throws IOException {
     String container1 = "container-01234";
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container1, containerOwner);
     scmContainerMapping.updateContainerState(container1,
         HddsProtos.LifeCycleEvent.CREATE);
@@ -136,7 +140,8 @@ public class TestContainerStateManager {
         HddsProtos.LifeCycleEvent.CREATED);
 
     String container2 = "container-56789";
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container2, containerOwner);
 
     ContainerInfo info = containerStateManager
@@ -177,7 +182,8 @@ public class TestContainerStateManager {
     // Allocate container1 and update its state from ALLOCATED -> CREATING ->
     // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
     String container1 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container1, containerOwner);
     containers = containerStateManager.getMatchingContainerIDs(containerOwner,
         xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -229,7 +235,8 @@ public class TestContainerStateManager {
     // Allocate container1 and update its state from ALLOCATED -> CREATING ->
     // DELETING
     String container2 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container2, containerOwner);
     scmContainerMapping.updateContainerState(container2,
         HddsProtos.LifeCycleEvent.CREATE);
@@ -243,7 +250,8 @@ public class TestContainerStateManager {
     // Allocate container1 and update its state from ALLOCATED -> CREATING ->
     // OPEN -> CLOSING -> CLOSED
     String container3 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container3, containerOwner);
     scmContainerMapping.updateContainerState(container3,
         HddsProtos.LifeCycleEvent.CREATE);
@@ -262,7 +270,8 @@ public class TestContainerStateManager {
   @Test
   public void testUpdatingAllocatedBytes() throws Exception {
     String container1 = "container" + RandomStringUtils.randomNumeric(5);
-    scm.allocateContainer(xceiverClientManager.getType(),
+    scm.getClientProtocolServer().allocateContainer(
+        xceiverClientManager.getType(),
         xceiverClientManager.getFactor(), container1, containerOwner);
     scmContainerMapping.updateContainerState(container1,
         HddsProtos.LifeCycleEvent.CREATE);

+ 17 - 17
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
 import org.apache.hadoop.hdds.scm.protocolPB
@@ -37,6 +37,17 @@ import java.util.concurrent.TimeoutException;
  */
 public interface MiniOzoneCluster {
 
+  /**
+   * Returns the Builder to construct MiniOzoneCluster.
+   *
+   * @param conf OzoneConfiguration
+   *
+   * @return MiniOzoneCluster builder
+   */
+  static Builder newBuilder(OzoneConfiguration conf) {
+    return new MiniOzoneClusterImpl.Builder(conf);
+  }
+
   /**
    * Returns the configuration object associated with the MiniOzoneCluster.
    *
@@ -119,8 +130,8 @@ public interface MiniOzoneCluster {
    * @return StorageContainerLocation Client
    * @throws IOException
    */
-  StorageContainerLocationProtocolClientSideTranslatorPB getStorageContainerLocationClient()
-      throws IOException;
+  StorageContainerLocationProtocolClientSideTranslatorPB
+      getStorageContainerLocationClient() throws IOException;
 
   /**
    * Restarts StorageContainerManager instance.
@@ -155,20 +166,10 @@ public interface MiniOzoneCluster {
    */
   void shutdown();
 
-  /**
-   * Returns the Builder to construct MiniOzoneCluster.
-   *
-   * @param conf OzoneConfiguration
-   *
-   * @return MiniOzoneCluster builder
-   */
-  static Builder newBuilder(OzoneConfiguration conf) {
-    return new MiniOzoneClusterImpl.Builder(conf);
-  }
-
   /**
    * Builder class for MiniOzoneCluster.
    */
+  @SuppressWarnings("CheckStyle")
   abstract class Builder {
 
     protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
@@ -261,7 +262,6 @@ public interface MiniOzoneCluster {
       return this;
     }
 
-
     /**
      * Sets the number of HeartBeat Interval of Datanodes, the value should be
      * in MilliSeconds.

+ 5 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -35,14 +35,14 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.ozone.ksm.KSMStorage;
 import org.apache.hadoop.ozone.web.client.OzoneRestClient;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -179,8 +179,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
    * @throws IOException if there is an I/O error
    */
   @Override
-  public StorageContainerLocationProtocolClientSideTranslatorPB getStorageContainerLocationClient()
-      throws IOException {
+  public StorageContainerLocationProtocolClientSideTranslatorPB
+      getStorageContainerLocationClient() throws IOException {
     long version = RPC.getProtocolVersion(
         StorageContainerLocationProtocolPB.class);
     InetSocketAddress address = scm.getClientRpcAddress();
@@ -226,7 +226,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
 
       File baseDir = new File(GenericTestUtils.getTempPath(
           MiniOzoneClusterImpl.class.getSimpleName() + "-" +
-              scm.getScmInfo().getClusterId()));
+              scm.getClientProtocolServer().getScmInfo().getClusterId()));
       FileUtils.deleteDirectory(baseDir);
 
       if (ksm != null) {

+ 10 - 10
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -32,8 +32,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.StorageContainerManager.StartupOption;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -113,7 +113,7 @@ public class TestStorageContainerManager {
           .thenReturn(fakeUser);
 
       try {
-        mockScm.deleteContainer("container1");
+        mockScm.getClientProtocolServer().deleteContainer("container1");
         fail("Operation should fail, expecting an IOException here.");
       } catch (Exception e) {
         if (expectPermissionDenied) {
@@ -127,8 +127,8 @@ public class TestStorageContainerManager {
       }
 
       try {
-        Pipeline pipeLine2 = mockScm.allocateContainer(
-            xceiverClientManager.getType(),
+        Pipeline pipeLine2 = mockScm.getClientProtocolServer()
+            .allocateContainer(xceiverClientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, "container2", "OZONE");
         if (expectPermissionDenied) {
           fail("Operation should fail, expecting an IOException here.");
@@ -140,8 +140,8 @@ public class TestStorageContainerManager {
       }
 
       try {
-        Pipeline pipeLine3 = mockScm.allocateContainer(
-            xceiverClientManager.getType(),
+        Pipeline pipeLine3 = mockScm.getClientProtocolServer()
+            .allocateContainer(xceiverClientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, "container3", "OZONE");
 
         if (expectPermissionDenied) {
@@ -155,7 +155,7 @@ public class TestStorageContainerManager {
       }
 
       try {
-        mockScm.getContainer("container4");
+        mockScm.getClientProtocolServer().getContainer("container4");
         fail("Operation should fail, expecting an IOException here.");
       } catch (Exception e) {
         if (expectPermissionDenied) {
@@ -436,7 +436,7 @@ public class TestStorageContainerManager {
     scmStore.initialize();
     StorageContainerManager scm = StorageContainerManager.createSCM(null, conf);
     //Reads the SCM Info from SCM instance
-    ScmInfo scmInfo = scm.getScmInfo();
+    ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
     Assert.assertEquals(clusterId, scmInfo.getClusterId());
     Assert.assertEquals(scmId, scmInfo.getScmId());
   }

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java

@@ -157,7 +157,7 @@ public class TestStorageContainerManagerHelper {
   private MetadataStore getContainerMetadata(String containerName)
       throws IOException {
     Pipeline pipeline = cluster.getStorageContainerManager()
-        .getContainer(containerName);
+        .getClientProtocolServer().getContainer(containerName);
     DatanodeDetails leadDN = pipeline.getLeader();
     OzoneContainer containerServer =
         getContainerServerByDatanodeUuid(leadDN.getUuidString());

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.junit.AfterClass;

+ 3 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -29,7 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.ServicePort;
@@ -646,7 +646,8 @@ public class TestKeySpaceManager {
     keys.add(keyArgs.getResourceName());
     exception.expect(IOException.class);
     exception.expectMessage("Specified block key does not exist");
-    cluster.getStorageContainerManager().getBlockLocations(keys);
+    cluster.getStorageContainerManager().getBlockProtocolServer()
+        .getBlockLocations(keys);
 
     // Delete the key again to test deleting non-existing key.
     exception.expect(IOException.class);
@@ -818,9 +819,6 @@ public class TestKeySpaceManager {
     listKeyArgs = new ListArgs(bucketArgs, null, 100, null);
     result = storageHandler.listKeys(listKeyArgs);
     Assert.assertEquals(numKeys, result.getKeyList().size());
-    List<KeyInfo> allKeys = result.getKeyList().stream()
-        .filter(item -> item.getSize() == 4096)
-        .collect(Collectors.toList());
 
     // List keys with prefix "aKey".
     listKeyArgs = new ListArgs(bucketArgs, "aKey", 100, null);

+ 8 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java

@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.scm;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -124,7 +124,7 @@ public class TestSCMCli {
   public void testCreateContainer() throws Exception {
     String containerName =  "containerTestCreate";
     try {
-      scm.getContainer(containerName);
+      scm.getClientProtocolServer().getContainer(containerName);
       fail("should not be able to get the container");
     } catch (IOException ioe) {
       assertTrue(ioe.getMessage().contains(
@@ -132,14 +132,16 @@ public class TestSCMCli {
     }
     String[] args = {"-container", "-create", "-c", containerName};
     assertEquals(ResultCode.SUCCESS, cli.run(args));
-    Pipeline container = scm.getContainer(containerName);
+    Pipeline container = scm.getClientProtocolServer()
+        .getContainer(containerName);
     assertNotNull(container);
     assertEquals(containerName, container.getContainerName());
   }
 
   private boolean containerExist(String containerName) {
     try {
-      Pipeline scmPipeline = scm.getContainer(containerName);
+      Pipeline scmPipeline = scm.getClientProtocolServer()
+          .getContainer(containerName);
       return scmPipeline != null
           && containerName.equals(scmPipeline.getContainerName());
     } catch (IOException e) {
@@ -447,7 +449,8 @@ public class TestSCMCli {
     String containerName =  "containerTestClose";
     String[] args = {"-container", "-create", "-c", containerName};
     assertEquals(ResultCode.SUCCESS, cli.run(args));
-    Pipeline container = scm.getContainer(containerName);
+    Pipeline container = scm.getClientProtocolServer()
+        .getContainer(containerName);
     assertNotNull(container);
     assertEquals(containerName, container.getContainerName());
 

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.scm;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;

+ 8 - 8
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java

@@ -26,7 +26,7 @@ import java.util.UUID;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -80,7 +80,7 @@ public class TestSCMMetrics {
       ContainerReportsRequestProto request = createContainerReport(numReport,
           stat, null);
       String fstDatanodeUuid = request.getDatanodeDetails().getUuid();
-      scmManager.sendContainerReport(request);
+      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
 
       // verify container stat metrics
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -103,7 +103,7 @@ public class TestSCMMetrics {
       // add one new report
       request = createContainerReport(1, stat, null);
       String sndDatanodeUuid = request.getDatanodeDetails().getUuid();
-      scmManager.sendContainerReport(request);
+      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
 
       scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
       assertEquals(size * (numReport + 1),
@@ -125,12 +125,12 @@ public class TestSCMMetrics {
       // Re-send reports but with different value for validating
       // the aggregation.
       stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
-      scmManager.sendContainerReport(createContainerReport(1, stat,
-          fstDatanodeUuid));
+      scmManager.getDatanodeProtocolServer().sendContainerReport(
+          createContainerReport(1, stat, fstDatanodeUuid));
 
       stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
-      scmManager.sendContainerReport(createContainerReport(1, stat,
-          sndDatanodeUuid));
+      scmManager.getDatanodeProtocolServer().sendContainerReport(
+          createContainerReport(1, stat, sndDatanodeUuid));
 
       // the global container metrics value should be updated
       scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -175,7 +175,7 @@ public class TestSCMMetrics {
           .getDatanodeDetails().getUuidString();
       ContainerReportsRequestProto request = createContainerReport(numReport,
           stat, datanodeUuid);
-      scmManager.sendContainerReport(request);
+      scmManager.getDatanodeProtocolServer().sendContainerReport(request);
 
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
       assertEquals(size * numReport,