Parcourir la source

HDFS-11699. Ozone:SCM: Add support for close containers in SCM. Contributed by Anu Engineer.

Nanda kumar il y a 7 ans
Parent
commit
1094af072c

+ 147 - 49
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java

@@ -27,9 +27,11 @@ import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.container.closer.ContainerCloser;
 import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -57,6 +59,9 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
 import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
     .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This
@@ -77,6 +82,8 @@ public class ContainerMapping implements Mapping {
   private final LeaseManager<ContainerInfo> containerLeaseManager;
   private final ContainerSupervisor containerSupervisor;
   private final float containerCloseThreshold;
+  private final ContainerCloser closer;
+  private final long size;
 
   /**
    * Constructs a mapping class that creates mapping between container names
@@ -98,6 +105,7 @@ public class ContainerMapping implements Mapping {
       cacheSizeMB) throws IOException {
     this.nodeManager = nodeManager;
     this.cacheSize = cacheSizeMB;
+    this.closer = new ContainerCloser(nodeManager, conf);
 
     File metaDir = OzoneUtils.getOzoneMetaDirPath(conf);
 
@@ -113,6 +121,10 @@ public class ContainerMapping implements Mapping {
     this.lock = new ReentrantLock();
 
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
+
+    // To be replaced with code getStorageSize once it is committed.
+    size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
     this.containerStateManager =
         new ContainerStateManager(conf, this);
     this.containerSupervisor =
@@ -342,6 +354,7 @@ public class ContainerMapping implements Mapping {
 
   /**
    * Returns the container State Manager.
+   *
    * @return ContainerStateManager
    */
   @Override
@@ -351,6 +364,18 @@ public class ContainerMapping implements Mapping {
 
   /**
    * Process container report from Datanode.
+   * <p>
+   * Processing follows a very simple logic for time being.
+   * <p>
+   * 1. Datanodes report the current State -- denoted by the datanodeState
+   * <p>
+   * 2. We are the older SCM state from the Database -- denoted by
+   * the knownState.
+   * <p>
+   * 3. We copy the usage etc. from currentState to newState and log that
+   * newState to the DB. This allows us SCM to bootup again and read the
+   * state of the world from the DB, and then reconcile the state from
+   * container reports, when they arrive.
    *
    * @param reports Container report
    */
@@ -360,63 +385,37 @@ public class ContainerMapping implements Mapping {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
     containerSupervisor.handleContainerReport(reports);
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo :
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
-      byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray();
+      byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray();
       lock.lock();
       try {
         byte[] containerBytes = containerStore.get(dbKey);
         if (containerBytes != null) {
-          OzoneProtos.SCMContainerInfo oldInfo =
+          OzoneProtos.SCMContainerInfo knownState =
               OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
 
-          OzoneProtos.SCMContainerInfo.Builder builder =
-              OzoneProtos.SCMContainerInfo.newBuilder();
-          builder.setContainerName(oldInfo.getContainerName());
-          builder.setPipeline(oldInfo.getPipeline());
-          // If used size is greater than allocated size, we will be updating
-          // allocated size with used size. This update is done as a fallback
-          // mechanism in case SCM crashes without properly updating allocated
-          // size. Correct allocated value will be updated by
-          // ContainerStateManager during SCM shutdown.
-          long usedSize = containerInfo.getUsed();
-          long allocated = oldInfo.getAllocatedBytes() > usedSize ?
-              oldInfo.getAllocatedBytes() : usedSize;
-          builder.setAllocatedBytes(allocated);
-          builder.setUsedBytes(containerInfo.getUsed());
-          builder.setNumberOfKeys(containerInfo.getKeyCount());
-          builder.setState(oldInfo.getState());
-          builder.setStateEnterTime(oldInfo.getStateEnterTime());
-          builder.setContainerID(oldInfo.getContainerID());
-          if (oldInfo.getOwner() != null) {
-            builder.setOwner(oldInfo.getOwner());
-          }
-          OzoneProtos.SCMContainerInfo newContainerInfo = builder.build();
-          containerStore.put(dbKey, newContainerInfo.toByteArray());
-          float containerUsedPercentage = 1.0f *
-              containerInfo.getUsed() / containerInfo.getSize();
-          // TODO: Handling of containers which are already in close queue.
-          if (containerUsedPercentage >= containerCloseThreshold) {
-            // TODO: The container has to be moved to close container queue.
-            // For now, we are just updating the container state to CLOSING.
-            // Close container implementation can decide on how to maintain
-            // list of containers to be closed, this is the place where we
-            // have to add the containers to that list.
-            OzoneProtos.LifeCycleState state = updateContainerState(
-                ContainerInfo.fromProtobuf(newContainerInfo).getContainerName(),
-                OzoneProtos.LifeCycleEvent.FINALIZE);
-            if (state != OzoneProtos.LifeCycleState.CLOSING) {
-              LOG.error("Failed to close container {}, reason : Not able to " +
-                  "update container state, current container state: {}.",
-                  containerInfo.getContainerName(), state);
-            }
+          OzoneProtos.SCMContainerInfo newState =
+              reconcileState(datanodeState, knownState);
+
+          // FIX ME: This can be optimized, we write twice to memory, where a
+          // single write would work well.
+          //
+          // We need to write this to DB again since the closed only write
+          // the updated State.
+          containerStore.put(dbKey, newState.toByteArray());
+
+          // If the container is closed, then state is already written to SCM
+          // DB.TODO: So can we can write only once to DB.
+          if (closeContainerIfNeeded(newState)) {
+            LOG.info("Closing the Container: {}", newState.getContainerName());
           }
         } else {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
-              " {}, for container: {}, reason: container doesn't exist in" +
-              "container database.", reports.getDatanodeID(),
-              containerInfo.getContainerName());
+                  " {}, for container: {}, reason: container doesn't exist in" +
+                  "container database.", reports.getDatanodeID(),
+              datanodeState.getContainerName());
         }
       } finally {
         lock.unlock();
@@ -424,11 +423,110 @@ public class ContainerMapping implements Mapping {
     }
   }
 
+  /**
+   * Reconciles the state from Datanode with the state in SCM.
+   *
+   * @param datanodeState - State from the Datanode.
+   * @param knownState - State inside SCM.
+   * @return new SCM State for this container.
+   */
+  private OzoneProtos.SCMContainerInfo reconcileState(
+      StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
+      OzoneProtos.SCMContainerInfo knownState) {
+    OzoneProtos.SCMContainerInfo.Builder builder =
+        OzoneProtos.SCMContainerInfo.newBuilder();
+    builder.setContainerName(knownState.getContainerName());
+    builder.setPipeline(knownState.getPipeline());
+    // If used size is greater than allocated size, we will be updating
+    // allocated size with used size. This update is done as a fallback
+    // mechanism in case SCM crashes without properly updating allocated
+    // size. Correct allocated value will be updated by
+    // ContainerStateManager during SCM shutdown.
+    long usedSize = datanodeState.getUsed();
+    long allocated = knownState.getAllocatedBytes() > usedSize ?
+        knownState.getAllocatedBytes() : usedSize;
+    builder.setAllocatedBytes(allocated);
+    builder.setUsedBytes(usedSize);
+    builder.setNumberOfKeys(datanodeState.getKeyCount());
+    builder.setState(knownState.getState());
+    builder.setStateEnterTime(knownState.getStateEnterTime());
+    builder.setContainerID(knownState.getContainerID());
+    if (knownState.getOwner() != null) {
+      builder.setOwner(knownState.getOwner());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Queues the close container command, to datanode and writes the new state
+   * to container DB.
+   * <p>
+   * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have
+   * one protobuf in one file and another definition in another file.
+   *
+   * @param newState - This is the state we maintain in SCM.
+   * @throws IOException
+   */
+  private boolean closeContainerIfNeeded(OzoneProtos.SCMContainerInfo newState)
+      throws IOException {
+    float containerUsedPercentage = 1.0f *
+        newState.getUsedBytes() / this.size;
+
+    ContainerInfo scmInfo = getContainer(newState.getContainerName());
+    if (containerUsedPercentage >= containerCloseThreshold
+        && !isClosed(scmInfo)) {
+      // We will call closer till get to the closed state.
+      // That is SCM will make this call repeatedly until we reach the closed
+      // state.
+      closer.close(newState);
+
+      if (shouldClose(scmInfo)) {
+        // This event moves the Container from Open to Closing State, this is
+        // a state inside SCM. This is the desired state that SCM wants this
+        // container to reach. We will know that a container has reached the
+        // closed state from container reports. This state change should be
+        // invoked once and only once.
+        OzoneProtos.LifeCycleState state = updateContainerState(
+            scmInfo.getContainerName(),
+            OzoneProtos.LifeCycleEvent.FINALIZE);
+        if (state != OzoneProtos.LifeCycleState.CLOSING) {
+          LOG.error("Failed to close container {}, reason : Not able " +
+                  "to " +
+                  "update container state, current container state: {}.",
+              newState.getContainerName(), state);
+          return false;
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * In Container is in closed state, if it is in closed, Deleting or Deleted
+   * State.
+   *
+   * @param info - ContainerInfo.
+   * @return true if is in open state, false otherwise
+   */
+  private boolean shouldClose(ContainerInfo info) {
+    return info.getState() == OzoneProtos.LifeCycleState.OPEN;
+  }
+
+  private boolean isClosed(ContainerInfo info) {
+    return info.getState() == OzoneProtos.LifeCycleState.CLOSED;
+  }
+
+  @VisibleForTesting
+  public ContainerCloser getCloser() {
+    return closer;
+  }
+
   /**
    * Closes this stream and releases any system resources associated with it.
    * If the stream is
    * already closed then invoking this method has no effect.
-   *
+   * <p>
    * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
    * fail require careful
    * attention. It is strongly advised to relinquish the underlying resources
@@ -457,7 +555,7 @@ public class ContainerMapping implements Mapping {
    * containerStateManager, when closing ContainerMapping, we need to update
    * this in the container store.
    *
-   * @throws IOException  on failure.
+   * @throws IOException on failure.
    */
   @VisibleForTesting
   public void flushContainerInfo() throws IOException {

+ 192 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java

@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.closer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * A class that manages closing of containers. This allows transition from a
+ * open but full container to a closed container, to which no data is written.
+ */
+public class ContainerCloser {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCloser.class);
+  private static final long MULTIPLIER = 3L;
+  private static final int CLEANUP_WATER_MARK = 1000;
+  private final NodeManager nodeManager;
+  private final Map<String, Long> commandIssued;
+  private final Configuration configuration;
+  private final AtomicInteger mapCount;
+  private final long reportInterval;
+  private final AtomicInteger threadRunCount;
+  private final AtomicBoolean isRunning;
+
+  /**
+   * Constructs the ContainerCloser class.
+   *
+   * @param nodeManager - NodeManager
+   * @param conf -   Configuration
+   */
+  public ContainerCloser(NodeManager nodeManager, Configuration conf) {
+    Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(conf);
+    this.nodeManager = nodeManager;
+    this.configuration = conf;
+    this.commandIssued = new ConcurrentHashMap<>();
+    this.mapCount = new AtomicInteger(0);
+    this.threadRunCount = new AtomicInteger(0);
+    this.isRunning = new AtomicBoolean(false);
+    this.reportInterval = this.configuration.getTimeDuration(
+        OZONE_CONTAINER_REPORT_INTERVAL,
+        OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    Preconditions.checkState(this.reportInterval > 0,
+        "report interval has to be greater than 0");
+  }
+
+  @VisibleForTesting
+  public static int getCleanupWaterMark() {
+    return CLEANUP_WATER_MARK;
+  }
+
+  /**
+   * Sends a Container Close command to the data nodes where this container
+   * lives.
+   *
+   * @param info - ContainerInfo.
+   */
+  public void close(OzoneProtos.SCMContainerInfo info) {
+
+    if (commandIssued.containsKey(info.getContainerName())) {
+      // We check if we issued a close command in last 3 * reportInterval secs.
+      long commandQueueTime = commandIssued.get(info.getContainerName());
+      long currentTime = TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow());
+      if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) {
+        commandIssued.remove(info.getContainerName());
+        mapCount.decrementAndGet();
+      } else {
+        // Ignore this request, since we just issued a close command. We
+        // should wait instead of sending a command to datanode again.
+        return;
+      }
+    }
+
+    // if we reached here, it means that we have not issued a command to the
+    // data node in last (3 times report interval). We are presuming that is
+    // enough time to close the container. Let us go ahead and queue a close
+    // to all the datanodes that participate in the container.
+    //
+    // Three important things to note here:
+    //
+    // 1. It is ok to send this command multiple times to a datanode. Close
+    // container is an idempotent command, if the container is already closed
+    // then we have no issues.
+    //
+    // 2. The container close command is issued to all datanodes. But
+    // depending on the pipeline type, some of the datanodes might ignore it.
+    //
+    // 3. SCM will see that datanode is closed from container reports, but it
+    // is possible that datanodes might get close commands since
+    // this queue can be emptied by a datanode after a close report is send
+    // to SCM. In that case also, data node will ignore this command.
+
+    OzoneProtos.Pipeline pipeline = info.getPipeline();
+    for (HdfsProtos.DatanodeIDProto datanodeID :
+        pipeline.getPipelineChannel().getMembersList()) {
+      nodeManager.addDatanodeCommand(DatanodeID.getFromProtoBuf(datanodeID),
+          new CloseContainerCommand(info.getContainerName()));
+    }
+    if (!commandIssued.containsKey(info.getContainerName())) {
+      commandIssued.put(info.getContainerName(),
+          TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow()));
+      mapCount.incrementAndGet();
+    }
+    // run the hash map cleaner thread if needed, non-blocking call.
+    runCleanerThreadIfNeeded();
+  }
+
+  private void runCleanerThreadIfNeeded() {
+    // Let us check if we should run a cleaner thread, not using map.size
+    // since it runs a loop in the case of the concurrentMap.
+    if (mapCount.get() > CLEANUP_WATER_MARK &&
+        isRunning.compareAndSet(false, true)) {
+      Runnable entryCleaner = () -> {
+        LOG.debug("Starting close container Hash map cleaner.");
+        try {
+          for (Map.Entry<String, Long> entry : commandIssued.entrySet()) {
+            long commandQueueTime = entry.getValue();
+            if (commandQueueTime + (MULTIPLIER * reportInterval) >
+                TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())) {
+
+              // It is possible for this remove to fail due to race conditions.
+              // No big deal we will cleanup next time.
+              commandIssued.remove(entry.getKey());
+              mapCount.decrementAndGet();
+            }
+          }
+          isRunning.compareAndSet(true, false);
+          LOG.debug("Finished running, close container Hash map cleaner.");
+        } catch (Exception ex) {
+          LOG.error("Unable to finish cleaning the closed containers map.", ex);
+        }
+      };
+
+      // Launch the cleaner thread when we need instead of having a daemon
+      // thread that is sleeping all the time. We need to set the Daemon to
+      // true to avoid blocking clean exits.
+      Thread cleanerThread = new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat("Closed Container Cleaner Thread - %d")
+          .build().newThread(entryCleaner);
+      threadRunCount.incrementAndGet();
+      cleanerThread.start();
+    }
+  }
+
+  @VisibleForTesting
+  public int getThreadRunCount() {
+    return threadRunCount.get();
+  }
+
+  @VisibleForTesting
+  public int getCloseCount() {
+    return mapCount.get();
+  }
+}

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+/**
+ * This package has class that close a container. That is move a container from
+ * open state to close state.
+ */
+package org.apache.hadoop.ozone.scm.container.closer;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java

@@ -148,5 +148,5 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @param id
    * @param command
    */
-  default void addDatanodeCommand(DatanodeID id, SCMCommand command) {}
+  void addDatanodeCommand(DatanodeID id, SCMCommand command);
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml

@@ -1106,6 +1106,14 @@
       for more info.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.max.nodepool.processing.threads</name>
+    <value>1</value>
+    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
+    <description>
+      Number of node pools to process in parallel.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.names</name>
     <value/>

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.NodePoolManager;
 import org.mockito.Mockito;
+import org.assertj.core.util.Preconditions;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -73,6 +74,7 @@ public class MockNodeManager implements NodeManager {
   private final Map<String, SCMNodeStat> nodeMetricMap;
   private final SCMNodeStat aggregateStat;
   private boolean chillmode;
+  private final Map<DatanodeID, List<SCMCommand>> commandMap;
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
@@ -87,6 +89,7 @@ public class MockNodeManager implements NodeManager {
       }
     }
     chillmode = false;
+    this.commandMap = new HashMap<>();
   }
 
   /**
@@ -297,6 +300,31 @@ public class MockNodeManager implements NodeManager {
     return null;
   }
 
+  @Override
+  public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
+    if(commandMap.containsKey(id)) {
+      List<SCMCommand> commandList = commandMap.get(id);
+      Preconditions.checkNotNull(commandList);
+      commandList.add(command);
+    } else {
+      List<SCMCommand> commandList = new LinkedList<>();
+      commandList.add(command);
+      commandMap.put(id, commandList);
+    }
+  }
+
+  // Returns the number of commands that is queued to this node manager.
+  public int getCommandCount(DatanodeID id) {
+    List<SCMCommand> list = commandMap.get(id);
+    return (list == null) ? 0 : list.size();
+  }
+
+  public void clearCommandQueue(DatanodeID id) {
+    if(commandMap.containsKey(id)) {
+      commandMap.put(id, new LinkedList<>());
+    }
+  }
+
   /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.

+ 221 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/closer/TestContainerCloser.java

@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.closer;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.container.ContainerMapping;
+import org.apache.hadoop.ozone.scm.container.MockNodeManager;
+import org.apache.hadoop.ozone.scm.container.TestContainerMapping;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent.CREATE;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent.CREATED;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
+
+/**
+ * Test class for Closing Container.
+ */
+public class TestContainerCloser {
+
+  private static final long GIGABYTE = 1024L * 1024L * 1024L;
+  private static Configuration configuration;
+  private static MockNodeManager nodeManager;
+  private static ContainerMapping mapping;
+  private static long size;
+  private static File testDir;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    configuration = SCMTestUtils.getConf();
+    size = configuration.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
+    configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL,
+        1, TimeUnit.SECONDS);
+    testDir = GenericTestUtils
+        .getTestDir(TestContainerMapping.class.getSimpleName());
+    configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
+        testDir.getAbsolutePath());
+    nodeManager = new MockNodeManager(true, 10);
+    mapping = new ContainerMapping(configuration, nodeManager, 128);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (mapping != null) {
+      mapping.close();
+    }
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Test
+  public void testClose() throws IOException {
+    String containerName = "container-" + RandomStringUtils.randomNumeric(5);
+
+    ContainerInfo info = mapping.allocateContainer(
+        OzoneProtos.ReplicationType.STAND_ALONE,
+        OzoneProtos.ReplicationFactor.ONE, containerName, "ozone");
+
+    //Execute these state transitions so that we can close the container.
+    mapping.updateContainerState(containerName, CREATE);
+    mapping.updateContainerState(containerName, CREATED);
+    long currentCount = mapping.getCloser().getCloseCount();
+    long runCount = mapping.getCloser().getThreadRunCount();
+
+    DatanodeID datanodeID = info.getPipeline().getLeader();
+    // Send a container report with used set to 1 GB. This should not close.
+    sendContainerReport(info, 1 * GIGABYTE);
+
+    // with only one container the  cleaner thread should not run.
+    Assert.assertEquals(0, mapping.getCloser().getThreadRunCount());
+
+    // With only 1 GB, the container should not be queued for closing.
+    Assert.assertEquals(0, mapping.getCloser().getCloseCount());
+
+    // Assert that the Close command was not queued for this Datanode.
+    Assert.assertEquals(0, nodeManager.getCommandCount(datanodeID));
+
+    long newUsed = (long) (size * 0.91f);
+    sendContainerReport(info, newUsed);
+
+    // with only one container the  cleaner thread should not run.
+    Assert.assertEquals(runCount, mapping.getCloser().getThreadRunCount());
+
+    // and close count will be one.
+    Assert.assertEquals(1,
+        mapping.getCloser().getCloseCount() - currentCount);
+
+    // Assert that the Close command was Queued for this Datanode.
+    Assert.assertEquals(1, nodeManager.getCommandCount(datanodeID));
+  }
+
+  @Test
+  public void testRepeatedClose() throws IOException,
+      InterruptedException {
+    // This test asserts that if we queue more than one report then the
+    // second report is discarded by the system if it lands in the 3 * report
+    // frequency window.
+
+    configuration.setTimeDuration(OZONE_CONTAINER_REPORT_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    String containerName = "container-" + RandomStringUtils.randomNumeric(5);
+
+    ContainerInfo info = mapping.allocateContainer(
+        OzoneProtos.ReplicationType.STAND_ALONE,
+        OzoneProtos.ReplicationFactor.ONE, containerName, "ozone");
+
+    //Execute these state transitions so that we can close the container.
+    mapping.updateContainerState(containerName, CREATE);
+
+    long currentCount = mapping.getCloser().getCloseCount();
+    long runCount = mapping.getCloser().getThreadRunCount();
+
+
+    DatanodeID datanodeID = info.getPipeline().getLeader();
+
+    // Send this command twice and assert we have only one command in the queue.
+    sendContainerReport(info, 5 * GIGABYTE);
+    sendContainerReport(info, 5 * GIGABYTE);
+
+    // Assert that the Close command was Queued for this Datanode.
+    Assert.assertEquals(1,
+        nodeManager.getCommandCount(datanodeID));
+    // And close count will be one.
+    Assert.assertEquals(1,
+        mapping.getCloser().getCloseCount() - currentCount);
+    Thread.sleep(TimeUnit.SECONDS.toMillis(4));
+
+    //send another close and the system will queue this to the command queue.
+    sendContainerReport(info, 5 * GIGABYTE);
+    Assert.assertEquals(2,
+        nodeManager.getCommandCount(datanodeID));
+    // but the close count will still be one, since from the point of view of
+    // closer we are closing only one container even if we have send multiple
+    // close commands to the datanode.
+    Assert.assertEquals(1, mapping.getCloser().getCloseCount()
+        - currentCount);
+  }
+
+  @Test
+  public void testCleanupThreadRuns() throws IOException,
+      InterruptedException {
+    // This test asserts that clean up thread runs once we have closed a
+    // number above cleanup water mark.
+
+    long runCount = mapping.getCloser().getThreadRunCount();
+
+    for (int x = 0; x < ContainerCloser.getCleanupWaterMark() + 10; x++) {
+      String containerName = "container-" + RandomStringUtils.randomNumeric(7);
+      ContainerInfo info = mapping.allocateContainer(
+          OzoneProtos.ReplicationType.STAND_ALONE,
+          OzoneProtos.ReplicationFactor.ONE, containerName, "ozone");
+      mapping.updateContainerState(containerName, CREATE);
+      mapping.updateContainerState(containerName, CREATED);
+      sendContainerReport(info, 5 * GIGABYTE);
+    }
+
+    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
+    // Assert that cleanup thread ran at least once.
+    Assert.assertTrue(mapping.getCloser().getThreadRunCount() - runCount > 0);
+  }
+
+  private void sendContainerReport(ContainerInfo info, long used) throws
+      IOException {
+    ContainerReportsRequestProto.Builder
+        reports =  ContainerReportsRequestProto.newBuilder();
+    reports.setType(ContainerReportsRequestProto.reportType.fullReport);
+
+    StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
+        StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
+    ciBuilder.setContainerName(info.getContainerName())
+        .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
+        .setSize(size)
+        .setUsed(used)
+        .setKeyCount(100000000L)
+        .setReadCount(100000000L)
+        .setWriteCount(100000000L)
+        .setReadBytes(2000000000L)
+        .setWriteBytes(2000000000L)
+        .setContainerID(1L);
+    reports.setDatanodeID(
+        DFSTestUtil.getLocalDatanodeID().getProtoBufMessage());
+    reports.addReports(ciBuilder);
+    mapping.processContainerReports(reports.build());
+  }
+}