Browse Source

HDFS-12521. Ozone: SCM should read all Container info into memory when booting up. Contributed by Lokesh Jain.

Anu Engineer 7 years ago
parent
commit
d6a4eaeebc
15 changed files with 410 additions and 133 deletions
  1. 4 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
  2. 4 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
  3. 5 4
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
  4. 7 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
  6. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
  7. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
  8. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java
  9. 7 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
  10. 61 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
  11. 4 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
  13. 14 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
  14. 39 74
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
  15. 245 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerStateManager.java

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
@@ -100,6 +101,7 @@ public class ContainerOperationClient implements ScmClient {
         createPipeline(client, pipeline);
       }
       // TODO : Container Client State needs to be updated.
+      // TODO : Return ContainerInfo instead of Pipeline
       createContainer(containerId, client, pipeline);
       return pipeline;
     } finally {
@@ -201,6 +203,7 @@ public class ContainerOperationClient implements ScmClient {
         createPipeline(client, pipeline);
       }
 
+      // TODO : Return ContainerInfo instead of Pipeline
       // connect to pipeline leader and allocate container on leader datanode.
       client = xceiverClientManager.acquireClient(pipeline);
       createContainer(containerId, client, pipeline);
@@ -273,7 +276,7 @@ public class ContainerOperationClient implements ScmClient {
    * {@inheritDoc}
    */
   @Override
-  public List<Pipeline> listContainer(String startName,
+  public List<ContainerInfo> listContainer(String startName,
       String prefixName, int count)
       throws IOException {
     return storageContainerLocationClient.listContainer(

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.scm.client;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 
@@ -71,7 +72,7 @@ public interface ScmClient {
   void deleteContainer(Pipeline pipeline, boolean force) throws IOException;
 
   /**
-   * Lists a range of containers and get the pipelines info.
+   * Lists a range of containers and get their info.
    *
    * @param startName start name, if null, start searching at the head.
    * @param prefixName prefix name, if null, then filter is disabled.
@@ -82,8 +83,8 @@ public interface ScmClient {
    * @return a list of pipeline.
    * @throws IOException
    */
-  List<Pipeline> listContainer(String startName, String prefixName, int count)
-      throws IOException;
+  List<ContainerInfo> listContainer(String startName, String prefixName,
+      int count) throws IOException;
 
   /**
    * Read meta data from an existing container.

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java

@@ -22,6 +22,7 @@ import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 
@@ -50,7 +51,7 @@ public interface StorageContainerLocationProtocol {
   Pipeline getContainer(String containerName) throws IOException;
 
   /**
-   * Ask SCM a list of pipelines with a range of container names
+   * Ask SCM a list of containers with a range of container names
    * and the limit of count.
    * Search container names between start name(exclusive), and
    * use prefix name to filter the result. the max size of the
@@ -62,11 +63,11 @@ public interface StorageContainerLocationProtocol {
    *              Usually the count will be replace with a very big
    *              value instead of being unlimited in case the db is very big)
    *
-   * @return a list of pipeline.
+   * @return a list of container.
    * @throws IOException
    */
-  List<Pipeline> listContainer(String startName, String prefixName, int count)
-      throws IOException;
+  List<ContainerInfo> listContainer(String startName, String prefixName,
+      int count) throws IOException;
 
   /**
    * Deletes a container in SCM.

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
@@ -132,7 +133,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
    * {@inheritDoc}
    */
   @Override
-  public List<Pipeline> listContainer(String startName, String prefixName,
+  public List<ContainerInfo> listContainer(String startName, String prefixName,
       int count) throws IOException {
     ListContainerRequestProto.Builder builder = ListContainerRequestProto
         .newBuilder();
@@ -148,11 +149,12 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
     try {
       ListContainerResponseProto response =
           rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
-      List<Pipeline> pipelineList = new ArrayList<>();
-      for (OzoneProtos.Pipeline pipelineProto : response.getPipelineList()) {
-        pipelineList.add(Pipeline.getFromProtoBuf(pipelineProto));
+      List<ContainerInfo> containerList = new ArrayList<>();
+      for (OzoneProtos.SCMContainerInfo containerInfoProto : response
+          .getContainersList()) {
+        containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
       }
-      return pipelineList;
+      return containerList;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto

@@ -80,7 +80,7 @@ message ListContainerRequestProto {
 }
 
 message ListContainerResponseProto {
-  repeated hadoop.hdfs.ozone.Pipeline pipeline = 1;
+  repeated hadoop.hdfs.ozone.SCMContainerInfo containers = 1;
 }
 
 message DeleteContainerRequestProto {

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
@@ -118,12 +119,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
       }
 
       count = request.getCount();
-      List<Pipeline> pipelineList = impl.listContainer(startName,
-          prefixName, count);
+      List<ContainerInfo> containerList =
+          impl.listContainer(startName, prefixName, count);
       ListContainerResponseProto.Builder builder =
           ListContainerResponseProto.newBuilder();
-      for (Pipeline pipeline : pipelineList) {
-        builder.addPipeline(pipeline.getProtobufMessage());
+      for (ContainerInfo container : containerList) {
+        builder.addContainers(container.getProtobuf());
       }
       return builder.build();
     } catch (IOException e) {

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -414,7 +414,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
    * {@inheritDoc}
    */
   @Override
-  public List<Pipeline> listContainer(String startName,
+  public List<ContainerInfo> listContainer(String startName,
       String prefixName, int count) throws IOException {
     return scmContainerManager.listContainer(startName, prefixName, count);
   }
@@ -828,6 +828,14 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmNodeManager.getNodeCount(nodestate);
   }
 
+  /**
+   * Returns SCM container manager.
+   */
+  @VisibleForTesting
+  public Mapping getScmContainerManager() {
+    return scmContainerManager;
+  }
+
   /**
    * Returns node manager.
    * @return - Node Manager

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ListContainerHandler.java

@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.ozone.scm.cli.OzoneCommandHandler;
 import org.apache.hadoop.ozone.web.utils.JsonUtils;
 import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
@@ -82,12 +83,12 @@ public class ListContainerHandler extends OzoneCommandHandler {
       }
     }
 
-    List<Pipeline> pipelineList =
+    List<ContainerInfo> containerList =
         getScmClient().listContainer(startName, prefixName, count);
 
     // Output data list
-    for (Pipeline pipeline : pipelineList) {
-      outputContainerPipeline(pipeline);
+    for (ContainerInfo container : containerList) {
+      outputContainerPipeline(container.getPipeline());
     }
   }
 

+ 7 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
@@ -107,8 +106,8 @@ public class ContainerMapping implements Mapping {
     this.lock = new ReentrantLock();
 
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
-    this.containerStateManager = new ContainerStateManager(conf, +this
-        .cacheSize * OzoneConsts.MB);
+    this.containerStateManager =
+        new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
     LOG.trace("Container State Manager created.");
 
     long containerCreationLeaseTimeout = conf.getLong(
@@ -144,10 +143,9 @@ public class ContainerMapping implements Mapping {
 
   /** {@inheritDoc} */
   @Override
-  public List<Pipeline> listContainer(String startName, String prefixName,
-      int count)
-      throws IOException {
-    List<Pipeline> pipelineList = new ArrayList<>();
+  public List<ContainerInfo> listContainer(String startName,
+      String prefixName, int count) throws IOException {
+    List<ContainerInfo> containerList = new ArrayList<>();
     lock.lock();
     try {
       if (containerStore.isEmpty()) {
@@ -160,7 +158,6 @@ public class ContainerMapping implements Mapping {
           containerStore.getSequentialRangeKVs(startKey, count, prefixFilter);
 
       // Transform the values into the pipelines.
-      // TODO: return list of ContainerInfo instead of pipelines.
       // TODO: filter by container state
       for (Map.Entry<byte[], byte[]> entry : range) {
         ContainerInfo containerInfo =
@@ -168,12 +165,12 @@ public class ContainerMapping implements Mapping {
                 OzoneProtos.SCMContainerInfo.PARSER.parseFrom(
                     entry.getValue()));
         Preconditions.checkNotNull(containerInfo);
-        pipelineList.add(containerInfo.getPipeline());
+        containerList.add(containerInfo);
       }
     } finally {
       lock.unlock();
     }
-    return pipelineList;
+    return containerList;
   }
 
   /**

+ 61 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java

@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.scm.container;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -44,9 +45,11 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.PriorityQueue;
+import java.util.List;
+import java.util.Arrays;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@@ -126,7 +129,7 @@ public class ContainerStateManager {
 
   // A map that maintains the ContainerKey to Containers of that type ordered
   // by last access time.
-  private final Lock writeLock;
+  private final ReadWriteLock lock;
   private final Queue<BlockContainerInfo> containerCloseQueue;
   private Map<ContainerKey, PriorityQueue<BlockContainerInfo>> containers;
 
@@ -136,8 +139,8 @@ public class ContainerStateManager {
    * <p>
    * TODO : Add Container Tags so we know which containers are owned by SCM.
    */
-  public ContainerStateManager(Configuration configuration, final long
-      cacheSize) throws IOException {
+  public ContainerStateManager(Configuration configuration,
+      Mapping containerMapping, final long cacheSize) throws IOException {
     this.cacheSize = cacheSize;
 
     // Initialize the container state machine.
@@ -160,9 +163,10 @@ public class ContainerStateManager {
         OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
         OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
 
-    writeLock = new ReentrantLock();
+    lock = new ReentrantReadWriteLock();
     containers = new HashMap<>();
-    initializeContainerMaps(containers);
+    initializeContainerMaps();
+    loadExistingContainers(containerMapping);
     containerCloseQueue = new ConcurrentLinkedQueue<BlockContainerInfo>();
   }
 
@@ -179,22 +183,46 @@ public class ContainerStateManager {
    * of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED}  container
    * states
    */
-  private void initializeContainerMaps(Map containerMaps) {
+  private void initializeContainerMaps() {
     // Called only from Ctor path, hence no lock is held.
-    Preconditions.checkNotNull(containerMaps);
+    Preconditions.checkNotNull(containers);
     for (OzoneProtos.Owner owner : OzoneProtos.Owner.values()) {
       for (ReplicationType type : ReplicationType.values()) {
         for (ReplicationFactor factor : ReplicationFactor.values()) {
           for (LifeCycleState state : LifeCycleState.values()) {
             ContainerKey key = new ContainerKey(owner, type, factor, state);
             PriorityQueue<BlockContainerInfo> queue = new PriorityQueue<>();
-            containerMaps.put(key, queue);
+            containers.put(key, queue);
           }
         }
       }
     }
   }
 
+  /**
+   * Load containers from the container store into the containerMaps.
+   *
+   * @param containerMapping -- Mapping object containing container store.
+   */
+  private void loadExistingContainers(Mapping containerMapping) {
+    try {
+      List<ContainerInfo> containerList =
+          containerMapping.listContainer(null, null, Integer.MAX_VALUE);
+      for (ContainerInfo container : containerList) {
+        ContainerKey key = new ContainerKey(container.getOwner(),
+            container.getPipeline().getType(),
+            container.getPipeline().getFactor(), container.getState());
+        BlockContainerInfo blockContainerInfo =
+            new BlockContainerInfo(container, 0);
+        ((PriorityQueue) containers.get(key)).add(blockContainerInfo);
+      }
+    } catch (IOException e) {
+      if (!e.getMessage().equals("No container exists in current db")) {
+        LOG.info("Could not list the containers", e);
+      }
+    }
+  }
+
   // 1. Client -> SCM: Begin_create
   // 2. Client -> Datanode: create
   // 3. Client -> SCM: complete    {SCM:Creating ->OK}
@@ -271,7 +299,7 @@ public class ContainerStateManager {
     Preconditions.checkNotNull(info);
     BlockContainerInfo blockInfo = new BlockContainerInfo(info, 0);
     blockInfo.setLastUsed(Time.monotonicNow());
-    writeLock.lock();
+    lock.writeLock().lock();
     try {
       ContainerKey key = new ContainerKey(owner, type, replicationFactor,
           blockInfo.getState());
@@ -280,7 +308,7 @@ public class ContainerStateManager {
       queue.add(blockInfo);
       LOG.trace("New container allocated: {}", blockInfo);
     } finally {
-      writeLock.unlock();
+      lock.writeLock().unlock();
     }
     return info;
   }
@@ -321,7 +349,7 @@ public class ContainerStateManager {
 
     ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
         pipeline.getFactor(), newState);
-    writeLock.lock();
+    lock.writeLock().lock();
     try {
 
       PriorityQueue<BlockContainerInfo> currentQueue = containers.get(oldKey);
@@ -349,7 +377,7 @@ public class ContainerStateManager {
 
       return newState;
     } finally {
-      writeLock.unlock();
+      lock.writeLock().unlock();
     }
   }
 
@@ -367,7 +395,7 @@ public class ContainerStateManager {
       Owner owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) {
     ContainerKey key = new ContainerKey(owner, type, factor, state);
-    writeLock.lock();
+    lock.writeLock().lock();
     try {
       PriorityQueue<BlockContainerInfo> queue = containers.get(key);
       if (queue.size() == 0) {
@@ -382,7 +410,7 @@ public class ContainerStateManager {
 
       while (iter.hasNext()) {
         BlockContainerInfo info = iter.next();
-        if (info.getAllocated() < this.containerSize + size) {
+        if (info.getAllocated() + size <= this.containerSize) {
 
           queue.remove(info);
           info.addAllocated(size);
@@ -399,7 +427,23 @@ public class ContainerStateManager {
       }
 
     } finally {
-      writeLock.unlock();
+      lock.writeLock().unlock();
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  public List<BlockContainerInfo> getMatchingContainers(Owner owner,
+      ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
+    ContainerKey key = new ContainerKey(owner, type, factor, state);
+    lock.readLock().lock();
+    try {
+      return Arrays.asList((BlockContainerInfo[]) containers.get(key)
+          .toArray(new BlockContainerInfo[0]));
+    } catch (Exception e) {
+      LOG.error("Could not get matching containers", e);
+    } finally {
+      lock.readLock().unlock();
     }
     return null;
   }

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.scm.container;
 
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -40,7 +39,7 @@ public interface Mapping extends Closeable {
   ContainerInfo getContainer(String containerName) throws IOException;
 
   /**
-   * Returns pipelines under certain conditions.
+   * Returns containers under certain conditions.
    * Search container names from start name(exclusive),
    * and use prefix name to filter the result. The max
    * size of the searching range cannot exceed the
@@ -52,11 +51,11 @@ public interface Mapping extends Closeable {
    *              Usually the count will be replace with a very big
    *              value instead of being unlimited in case the db is very big)
    *
-   * @return a list of pipeline.
+   * @return a list of container.
    * @throws IOException
    */
-  List<Pipeline> listContainer(String startName, String prefixName, int count)
-      throws IOException;
+  List<ContainerInfo> listContainer(String startName, String prefixName,
+      int count) throws IOException;
 
   /**
    * Allocates a new container for a given keyName and replication factor.

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java

@@ -74,6 +74,7 @@ public class StandaloneManagerImpl implements PipelineManager {
     String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
     pipeline.setContainerName(containerName);
     pipeline.setPipelineName(pipelineName);
+    pipeline.setFactor(replicationFactor);
     LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
     return pipeline;
   }

+ 14 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java

@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.cblock.util;
 
+import org.apache.hadoop.cblock.meta.ContainerDescriptor;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
@@ -68,7 +70,7 @@ public class MockStorageClient implements ScmClient {
   }
 
   /**
-   * This is a mock class, so returns the pipelines of start container
+   * This is a mock class, so returns the container infos of start container
    * and end container.
    *
    * @param startName start container name.
@@ -78,11 +80,18 @@ public class MockStorageClient implements ScmClient {
    * @throws IOException
    */
   @Override
-  public List<Pipeline> listContainer(String startName,
+  public List<ContainerInfo> listContainer(String startName,
       String prefixName, int count) throws IOException {
-    List<Pipeline> dataList = new ArrayList<>();
-    dataList.add(getContainer(startName));
-    return dataList;
+    List<ContainerInfo> containerList = new ArrayList<>();
+    ContainerDescriptor containerDescriptor =
+        ContainerLookUpService.lookUp(startName);
+    ContainerInfo container = new ContainerInfo.Builder()
+        .setContainerName(containerDescriptor.getContainerID())
+        .setPipeline(containerDescriptor.getPipeline())
+        .setState(OzoneProtos.LifeCycleState.ALLOCATED)
+        .build();
+    containerList.add(container);
+    return containerList;
   }
 
   /**

+ 39 - 74
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.scm.cli.ResultCode;
 import org.apache.hadoop.ozone.scm.cli.SCMCLI;
@@ -34,20 +33,16 @@ import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.util.StringUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.Ignore;
 import org.junit.rules.Timeout;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.List;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.CLOSED;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
@@ -56,11 +51,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import static org.junit.Assert.assertFalse;
 /**
  * This class tests the CLI of SCM.
  */
-@Ignore("Ignoring to fix configurable pipeline, Will bring this back.")
 public class TestSCMCli {
   private static SCMCLI cli;
 
@@ -70,7 +64,7 @@ public class TestSCMCli {
       storageContainerLocationClient;
 
   private static StorageContainerManager scm;
-  private static ContainerManager containerManager;
+  private static ScmClient containerOperationClient;
 
   private static ByteArrayOutputStream outContent;
   private static PrintStream outStream;
@@ -86,18 +80,17 @@ public class TestSCMCli {
     conf = new OzoneConfiguration();
     cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    xceiverClientManager = new XceiverClientManager(conf);
     storageContainerLocationClient =
         cluster.createStorageContainerLocationClient();
-    ScmClient client = new ContainerOperationClient(
+    containerOperationClient = new ContainerOperationClient(
         storageContainerLocationClient, new XceiverClientManager(conf));
     outContent = new ByteArrayOutputStream();
     outStream = new PrintStream(outContent);
     errContent = new ByteArrayOutputStream();
     errStream = new PrintStream(errContent);
-    cli = new SCMCLI(client, outStream, errStream);
+    cli = new SCMCLI(containerOperationClient, outStream, errStream);
     scm = cluster.getStorageContainerManager();
-    containerManager = cluster.getDataNodes().get(0)
-        .getOzoneContainerManager().getContainerManager();
   }
 
   private int runCommandAndGetOutput(String[] cmd,
@@ -163,12 +156,12 @@ public class TestSCMCli {
     // ****************************************
     // Create an non-empty container
     containerName = "non-empty-container";
-    pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        OzoneProtos.ReplicationFactor.ONE,
-        containerName);
-    containerData = new ContainerData(containerName, conf);
-    containerManager.createContainer(pipeline, containerData);
-    ContainerData cdata = containerManager.readContainer(containerName);
+    pipeline = containerOperationClient
+        .createContainer(xceiverClientManager.getType(),
+            OzoneProtos.ReplicationFactor.ONE, containerName);
+
+    ContainerData cdata = ContainerData
+        .getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
     KeyUtils.getDB(cdata, conf).put(containerName.getBytes(),
         "someKey".getBytes());
     Assert.assertTrue(containerExist(containerName));
@@ -184,7 +177,7 @@ public class TestSCMCli {
     Assert.assertTrue(containerExist(containerName));
 
     // Close the container
-    containerManager.closeContainer(containerName);
+    containerOperationClient.closeContainer(pipeline);
 
     // Gracefully delete a container should fail because it is not empty.
     testErr = new ByteArrayOutputStream();
@@ -198,31 +191,29 @@ public class TestSCMCli {
     delCmd = new String[] {"-container", "-delete", "-c", containerName, "-f"};
     exitCode = runCommandAndGetOutput(delCmd, out, null);
     assertEquals("Expected success, found:", ResultCode.SUCCESS, exitCode);
-    Assert.assertFalse(containerExist(containerName));
+    assertFalse(containerExist(containerName));
 
     // ****************************************
     // 2. Test to delete an empty container.
     // ****************************************
     // Create an empty container
     containerName = "empty-container";
-    pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), containerName);
-    containerData = new ContainerData(containerName, conf);
-    containerManager.createContainer(pipeline, containerData);
-    containerManager.closeContainer(containerName);
+    pipeline = containerOperationClient
+        .createContainer(xceiverClientManager.getType(),
+            OzoneProtos.ReplicationFactor.ONE, containerName);
+    containerOperationClient.closeContainer(pipeline);
     Assert.assertTrue(containerExist(containerName));
 
     // Successfully delete an empty container.
     delCmd = new String[] {"-container", "-delete", "-c", containerName};
     exitCode = runCommandAndGetOutput(delCmd, out, null);
     assertEquals(ResultCode.SUCCESS, exitCode);
-    Assert.assertFalse(containerExist(containerName));
+    assertFalse(containerExist(containerName));
 
     // After the container is deleted,
     // a same name container can now be recreated.
-    pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), containerName);
-    containerManager.createContainer(pipeline, containerData);
+    containerOperationClient.createContainer(xceiverClientManager.getType(),
+        OzoneProtos.ReplicationFactor.ONE, containerName);
     Assert.assertTrue(containerExist(containerName));
 
     // ****************************************
@@ -269,10 +260,11 @@ public class TestSCMCli {
 
     // Create an empty container.
     cname = "ContainerTestInfo1";
-    Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), cname);
-    ContainerData data = new ContainerData(cname, conf);
-    containerManager.createContainer(pipeline, data);
+    Pipeline pipeline = containerOperationClient
+        .createContainer(xceiverClientManager.getType(),
+            OzoneProtos.ReplicationFactor.ONE, cname);
+    ContainerData data = ContainerData
+        .getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
 
     info = new String[]{"-container", "-info", "-c", cname};
     ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -290,12 +282,12 @@ public class TestSCMCli {
 
     // Create an non-empty container
     cname = "ContainerTestInfo2";
-    pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), cname);
-    data = new ContainerData(cname, conf);
-    containerManager.createContainer(pipeline, data);
-    KeyUtils.getDB(data, conf).put(cname.getBytes(),
-        "someKey".getBytes());
+    pipeline = containerOperationClient
+        .createContainer(xceiverClientManager.getType(),
+            OzoneProtos.ReplicationFactor.ONE, cname);
+    data = ContainerData
+        .getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
+    KeyUtils.getDB(data, conf).put(cname.getBytes(), "someKey".getBytes());
 
     info = new String[]{"-container", "-info", "-c", cname};
     exitCode = runCommandAndGetOutput(info, out, null);
@@ -309,46 +301,20 @@ public class TestSCMCli {
 
     out.reset();
 
-    // Create a container with some meta data.
-    cname = "ContainerTestInfo3";
-    pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), cname);
-    data = new ContainerData(cname, conf);
-    data.addMetadata("VOLUME", "shire");
-    data.addMetadata("owner", "bilbo");
-    containerManager.createContainer(pipeline, data);
-    KeyUtils.getDB(data, conf).put(cname.getBytes(),
-        "someKey".getBytes());
-
-    List<String> metaList = data.getAllMetadata().entrySet().stream()
-        .map(entry -> entry.getKey() + ":" + entry.getValue())
-        .collect(Collectors.toList());
-    String metadataStr = StringUtils.join(", ", metaList);
-
-    info = new String[]{"-container", "-info", "-c", cname};
-    exitCode = runCommandAndGetOutput(info, out, null);
-    assertEquals(ResultCode.SUCCESS, exitCode);
-
-    openStatus = data.isOpen() ? "OPEN" : "CLOSED";
-    expected = String.format(formatStr, cname, openStatus,
-        data.getDBPath(), data.getContainerPath(), metadataStr,
-        datanodeID.getHostName(), datanodeID.getHostName());
-    assertEquals(expected, out.toString());
-
-    out.reset();
 
     // Close last container and test info again.
-    containerManager.closeContainer(cname);
+    containerOperationClient.closeContainer(pipeline);
 
-    info = new String[]{"-container", "-info", "-c", cname};
+    info = new String[] {"-container", "-info", "-c", cname};
     exitCode = runCommandAndGetOutput(info, out, null);
     assertEquals(ResultCode.SUCCESS, exitCode);
-    data = containerManager.readContainer(cname);
+    data = ContainerData
+        .getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
 
     openStatus = data.isOpen() ? "OPEN" : "CLOSED";
     expected = String.format(formatStrWithHash, cname, openStatus,
         data.getHash(), data.getDBPath(), data.getContainerPath(),
-        metadataStr, datanodeID.getHostName(), datanodeID.getHostName());
+        "", datanodeID.getHostName(), datanodeID.getHostName());
     assertEquals(expected, out.toString());
   }
 
@@ -376,10 +342,8 @@ public class TestSCMCli {
     String prefix = "ContainerForTesting";
     for (int index = 0; index < 20; index++) {
       String containerName = String.format("%s%02d", prefix, index);
-      Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
-          xceiverClientManager.getFactor(), containerName);
-      ContainerData data = new ContainerData(containerName, conf);
-      containerManager.createContainer(pipeline, data);
+      containerOperationClient.createContainer(xceiverClientManager.getType(),
+          OzoneProtos.ReplicationFactor.ONE, containerName);
     }
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -517,6 +481,7 @@ public class TestSCMCli {
     String expected1 =
         "usage: hdfs scm -container <commands> <options>\n" +
         "where <commands> can be one of the following\n" +
+        " -close    Close container\n" +
         " -create   Create container\n" +
         " -delete   Delete container\n" +
         " -info     Info container\n" +

+ 245 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerStateManager.java

@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.scm.StorageContainerManager;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
+import org.junit.*;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+/**
+ * Tests for ContainerStateManager.
+ */
+public class TestContainerStateManager {
+
+  private OzoneConfiguration conf;
+  private MiniOzoneCluster cluster;
+  private XceiverClientManager xceiverClientManager;
+  private StorageContainerManager scm;
+  private Mapping scmContainerMapping;
+  private ContainerStateManager stateManager;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() throws IOException {
+    conf = new OzoneConfiguration();
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    xceiverClientManager = new XceiverClientManager(conf);
+    scm = cluster.getStorageContainerManager();
+    scmContainerMapping = scm.getScmContainerManager();
+    stateManager = scmContainerMapping.getStateManager();
+  }
+
+  @After
+  public void cleanUp() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster.close();
+    }
+  }
+
+  @Test
+  public void testAllocateContainer() throws IOException {
+    // Allocate a container and verify the container info
+    String container1 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container1);
+    BlockContainerInfo info = stateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(container1, info.getContainerName());
+    Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocated());
+    Assert.assertEquals(OzoneProtos.Owner.OZONE, info.getOwner());
+    Assert.assertEquals(xceiverClientManager.getType(),
+        info.getPipeline().getType());
+    Assert.assertEquals(xceiverClientManager.getFactor(),
+        info.getPipeline().getFactor());
+    Assert.assertEquals(OzoneProtos.LifeCycleState.ALLOCATED, info.getState());
+
+    // Check there are two containers in ALLOCATED state after allocation
+    String container2 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container2);
+    int numContainers = stateManager
+        .getMatchingContainers(OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(2, numContainers);
+  }
+
+  @Test
+  public void testContainerStateManagerRestart() throws IOException {
+    // Allocate 5 containers in ALLOCATED state and 5 in CREATING state
+    String cname = "container" + RandomStringUtils.randomNumeric(5);
+    for (int i = 0; i < 10; i++) {
+      scm.allocateContainer(xceiverClientManager.getType(),
+          xceiverClientManager.getFactor(), cname + i);
+      if (i >= 5) {
+        scm.getScmContainerManager().updateContainerState(cname + i,
+            OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+      }
+    }
+
+    // New instance of ContainerStateManager should load all the containers in
+    // container store.
+    ContainerStateManager stateManager =
+        new ContainerStateManager(conf, scmContainerMapping,
+            128 * OzoneConsts.MB);
+    int containers = stateManager
+        .getMatchingContainers(OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(5, containers);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.CREATING).size();
+    Assert.assertEquals(5, containers);
+  }
+
+  @Test
+  public void testGetMatchingContainer() throws IOException {
+    String container1 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container1);
+    scmContainerMapping.updateContainerState(container1,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+    scmContainerMapping.updateContainerState(container1,
+        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+
+    String container2 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container2);
+
+    BlockContainerInfo info = stateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(container1, info.getContainerName());
+
+    info = stateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(null, info);
+
+    info = stateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(container2, info.getContainerName());
+
+    scmContainerMapping.updateContainerState(container2,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+    scmContainerMapping.updateContainerState(container2,
+        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+    info = stateManager
+        .getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.OPEN);
+    Assert.assertEquals(container2, info.getContainerName());
+  }
+
+  @Test
+  public void testUpdateContainerState() throws IOException {
+    int containers = stateManager
+        .getMatchingContainers(OzoneProtos.Owner.OZONE,
+            xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+            OzoneProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(0, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // OPEN -> DELETING -> DELETED
+    String container1 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container1);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.ALLOCATED).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping.updateContainerState(container1,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.CREATING).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping.updateContainerState(container1,
+        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.OPEN).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1, OzoneProtos.LifeCycleEvent.DELETE);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.DELETING).size();
+    Assert.assertEquals(1, containers);
+
+    scmContainerMapping
+        .updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLEANUP);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.DELETED).size();
+    Assert.assertEquals(1, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // DELETING
+    String container2 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container2);
+    scmContainerMapping.updateContainerState(container2,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+    scmContainerMapping
+        .updateContainerState(container2, OzoneProtos.LifeCycleEvent.TIMEOUT);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.DELETING).size();
+    Assert.assertEquals(1, containers);
+
+    // Allocate container1 and update its state from ALLOCATED -> CREATING ->
+    // OPEN ->  CLOSED
+    String container3 = "container" + RandomStringUtils.randomNumeric(5);
+    scm.allocateContainer(xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(), container3);
+    scmContainerMapping.updateContainerState(container3,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+    scmContainerMapping.updateContainerState(container3,
+        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+    scmContainerMapping
+        .updateContainerState(container3, OzoneProtos.LifeCycleEvent.CLOSE);
+    containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
+        xceiverClientManager.getType(), xceiverClientManager.getFactor(),
+        OzoneProtos.LifeCycleState.CLOSED).size();
+    Assert.assertEquals(1, containers);
+
+  }
+}