Jelajahi Sumber

HDFS-12787. Ozone: SCM: Aggregate the metrics from all the container reports. Contributed by Yiqun Lin.

Xiaoyu Yao 7 tahun lalu
induk
melakukan
6dca9fcb03

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.scm;
 
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.jmx.ServiceRuntimeInfo;
 
@@ -39,4 +41,10 @@ public interface SCMMXBean extends ServiceRuntimeInfo {
    * @return SCM client RPC server port
    */
   String getClientRpcPort();
+
+  /**
+   * Get container report info that includes container IO stats of nodes.
+   * @return The datanodeUUid to report json string mapping
+   */
+  Map<String, String> getContainerReport();
 }

+ 122 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -19,6 +19,10 @@ package org.apache.hadoop.ozone.scm;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -104,6 +108,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.stream.Collectors;
 
@@ -204,6 +210,9 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
 
   /** SCM metrics. */
   private static SCMMetrics metrics;
+  /** Key = DatanodeUuid, value = ContainerStat. */
+  private Cache<String, ContainerStat> containerReportCache;
+
 
   private static final String USAGE =
       "Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ "
@@ -225,13 +234,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
 
     StorageContainerManager.initMetrics();
+    initContainerReportCache(conf);
+
     scmStorage = new SCMStorage(conf);
     String clusterId = scmStorage.getClusterID();
     if (clusterId == null) {
       throw new SCMException("clusterId not found",
           ResultCodes.SCM_NOT_INITIALIZED);
     }
-    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID());
+    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
     scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
     scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
         scmContainerManager, cacheSize);
@@ -297,6 +308,31 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
     registerMXBean();
   }
 
+  /**
+   * Initialize container reports cache that sent from datanodes.
+   *
+   * @param conf
+   */
+  private void initContainerReportCache(OzoneConfiguration conf) {
+    containerReportCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+        .maximumSize(Integer.MAX_VALUE)
+        .removalListener(new RemovalListener<String, ContainerStat>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<String, ContainerStat> removalNotification) {
+            synchronized (containerReportCache) {
+              ContainerStat stat = removalNotification.getValue();
+              // remove invalid container report
+              metrics.decrContainerStat(stat);
+              LOG.debug(
+                  "Remove expired container stat entry for datanode: {}.",
+                  removalNotification.getKey());
+            }
+          }
+        }).build();
+  }
+
   /**
    * Builds a message for logging startup information about an RPC server.
    *
@@ -836,7 +872,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
       LOG.error("SCM block manager service stop failed.", ex);
     }
 
-    metrics.unRegister();
+    if (containerReportCache != null) {
+      containerReportCache.invalidateAll();
+      containerReportCache.cleanUp();
+    }
+
+    if (metrics != null) {
+      metrics.unRegister();
+    }
+
     unregisterMXBean();
     IOUtils.cleanupWithLogger(LOG, scmContainerManager);
     IOUtils.cleanupWithLogger(LOG, scmBlockManager);
@@ -917,33 +961,44 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   @Override
   public ContainerReportsResponseProto sendContainerReport(
       ContainerReportsRequestProto reports) throws IOException {
+    updateContainerReportMetrics(reports);
+
+    // should we process container reports async?
+    scmContainerManager.processContainerReports(
+        DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
+        reports.getType(), reports.getReportsList());
+    return ContainerReportsResponseProto.newBuilder().build();
+  }
+
+  private void updateContainerReportMetrics(
+      ContainerReportsRequestProto reports) {
+    ContainerStat newStat = null;
     // TODO: We should update the logic once incremental container report
     // type is supported.
-    if (reports.getType() ==
-        ContainerReportsRequestProto.reportType.fullReport) {
-      ContainerStat stat = new ContainerStat();
+    if (reports
+        .getType() == ContainerReportsRequestProto.reportType.fullReport) {
+      newStat = new ContainerStat();
       for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
           .getReportsList()) {
-        stat.add(new ContainerStat(info.getSize(), info.getUsed(),
+        newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
             info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
             info.getReadCount(), info.getWriteCount()));
       }
 
       // update container metrics
-      metrics.setLastContainerReportSize(stat.getSize().get());
-      metrics.setLastContainerReportUsed(stat.getUsed().get());
-      metrics.setLastContainerReportKeyCount(stat.getKeyCount().get());
-      metrics.setLastContainerReportReadBytes(stat.getReadBytes().get());
-      metrics.setLastContainerReportWriteBytes(stat.getWriteBytes().get());
-      metrics.setLastContainerReportReadCount(stat.getReadCount().get());
-      metrics.setLastContainerReportWriteCount(stat.getWriteCount().get());
+      metrics.setLastContainerStat(newStat);
     }
 
-    // should we process container reports async?
-    scmContainerManager.processContainerReports(
-        DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
-        reports.getType(), reports.getReportsList());
-    return ContainerReportsResponseProto.newBuilder().build();
+    // Update container stat entry, this will trigger a removal operation if it
+    // exists in cache.
+    synchronized (containerReportCache) {
+      String datanodeUuid = reports.getDatanodeID().getDatanodeUuid();
+      if (datanodeUuid != null && newStat != null) {
+        containerReportCache.put(datanodeUuid, newStat);
+        // update global view container metrics
+        metrics.incrContainerStat(newStat);
+      }
+    }
   }
 
   /**
@@ -1124,4 +1179,53 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   public static SCMMetrics getMetrics() {
     return metrics == null ? SCMMetrics.create() : metrics;
   }
+
+  /**
+   * Invalidate container stat entry for given datanode.
+   *
+   * @param datanodeUuid
+   */
+  public void removeContainerReport(String datanodeUuid) {
+    synchronized (containerReportCache) {
+      containerReportCache.invalidate(datanodeUuid);
+    }
+  }
+
+  /**
+   * Get container stat of specified datanode.
+   *
+   * @param datanodeUuid
+   * @return
+   */
+  public ContainerStat getContainerReport(String datanodeUuid) {
+    ContainerStat stat = null;
+    synchronized (containerReportCache) {
+      stat = containerReportCache.getIfPresent(datanodeUuid);
+    }
+
+    return stat;
+  }
+
+  /**
+   * Returns a view of the container stat entries. Modifications made to the
+   * map will directly affect the cache.
+   *
+   * @return
+   */
+  public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
+    return containerReportCache.asMap();
+  }
+
+  @Override
+  public Map<String, String> getContainerReport() {
+    Map<String, String> id2StatMap = new HashMap<>();
+    synchronized (containerReportCache) {
+      ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
+      for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
+        id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
+      }
+    }
+
+    return id2StatMap;
+  }
 }

+ 40 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java

@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.ozone.scm.container.placement.metrics;
 
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.web.utils.JsonUtils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
 /**
@@ -26,36 +31,43 @@ public class ContainerStat {
   /**
    * The maximum container size.
    */
+  @JsonProperty("Size")
   private LongMetric size;
 
   /**
    * The number of bytes used by the container.
    */
+  @JsonProperty("Used")
   private LongMetric used;
 
   /**
    * The number of keys in the container.
    */
+  @JsonProperty("KeyCount")
   private LongMetric keyCount;
 
   /**
    * The number of bytes read from the container.
    */
+  @JsonProperty("ReadBytes")
   private LongMetric readBytes;
 
   /**
    * The number of bytes write into the container.
    */
+  @JsonProperty("WriteBytes")
   private LongMetric writeBytes;
 
   /**
    * The number of times the container is read.
    */
+  @JsonProperty("ReadCount")
   private LongMetric readCount;
 
   /**
-   * The number of times the container is written into .
+   * The number of times the container is written into.
    */
+  @JsonProperty("WriteCount")
   private LongMetric writeCount;
 
   public ContainerStat() {
@@ -117,6 +129,10 @@ public class ContainerStat {
   }
 
   public void add(ContainerStat stat) {
+    if (stat == null) {
+      return;
+    }
+
     this.size.add(stat.getSize().get());
     this.used.add(stat.getUsed().get());
     this.keyCount.add(stat.getKeyCount().get());
@@ -125,4 +141,26 @@ public class ContainerStat {
     this.readCount.add(stat.getReadCount().get());
     this.writeCount.add(stat.getWriteCount().get());
   }
-}
+
+  public void subtract(ContainerStat stat) {
+    if (stat == null) {
+      return;
+    }
+
+    this.size.subtract(stat.getSize().get());
+    this.used.subtract(stat.getUsed().get());
+    this.keyCount.subtract(stat.getKeyCount().get());
+    this.readBytes.subtract(stat.getReadBytes().get());
+    this.writeBytes.subtract(stat.getWriteBytes().get());
+    this.readCount.subtract(stat.getReadCount().get());
+    this.writeCount.subtract(stat.getWriteCount().get());
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.toJsonString(this);
+    } catch (IOException ignored) {
+      return null;
+    }
+  }
+}

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java

@@ -16,9 +16,13 @@
  */
 package org.apache.hadoop.ozone.scm.container.placement.metrics;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
 /**
  * An helper class for all metrics based on Longs.
  */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
 public class LongMetric implements DatanodeMetric<Long, Long> {
   private Long value;
 

+ 67 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 /**
@@ -43,6 +44,14 @@ public class SCMMetrics {
   @Metric private MutableGaugeLong lastContainerReportReadCount;
   @Metric private MutableGaugeLong lastContainerReportWriteCount;
 
+  @Metric private MutableCounterLong containerReportSize;
+  @Metric private MutableCounterLong containerReportUsed;
+  @Metric private MutableCounterLong containerReportKeyCount;
+  @Metric private MutableCounterLong containerReportReadBytes;
+  @Metric private MutableCounterLong containerReportWriteBytes;
+  @Metric private MutableCounterLong containerReportReadCount;
+  @Metric private MutableCounterLong containerReportWriteCount;
+
   public SCMMetrics() {
   }
 
@@ -80,6 +89,64 @@ public class SCMMetrics {
     this.lastContainerReportWriteCount.set(writeCount);
   }
 
+  public void incrContainerReportSize(long size) {
+    this.containerReportSize.incr(size);
+  }
+
+  public void incrContainerReportUsed(long used) {
+    this.containerReportUsed.incr(used);
+  }
+
+  public void incrContainerReportKeyCount(long keyCount) {
+    this.containerReportKeyCount.incr(keyCount);
+  }
+
+  public void incrContainerReportReadBytes(long readBytes) {
+    this.containerReportReadBytes.incr(readBytes);
+  }
+
+  public void incrContainerReportWriteBytes(long writeBytes) {
+    this.containerReportWriteBytes.incr(writeBytes);
+  }
+
+  public void incrContainerReportReadCount(long readCount) {
+    this.containerReportReadCount.incr(readCount);
+  }
+
+  public void incrContainerReportWriteCount(long writeCount) {
+    this.containerReportWriteCount.incr(writeCount);
+  }
+
+  public void setLastContainerStat(ContainerStat newStat) {
+    this.lastContainerReportSize.set(newStat.getSize().get());
+    this.lastContainerReportUsed.set(newStat.getUsed().get());
+    this.lastContainerReportKeyCount.set(newStat.getKeyCount().get());
+    this.lastContainerReportReadBytes.set(newStat.getReadBytes().get());
+    this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get());
+    this.lastContainerReportReadCount.set(newStat.getReadCount().get());
+    this.lastContainerReportWriteCount.set(newStat.getWriteCount().get());
+  }
+
+  public void incrContainerStat(ContainerStat deltaStat) {
+    this.containerReportSize.incr(deltaStat.getSize().get());
+    this.containerReportUsed.incr(deltaStat.getUsed().get());
+    this.containerReportKeyCount.incr(deltaStat.getKeyCount().get());
+    this.containerReportReadBytes.incr(deltaStat.getReadBytes().get());
+    this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get());
+    this.containerReportReadCount.incr(deltaStat.getReadCount().get());
+    this.containerReportWriteCount.incr(deltaStat.getWriteCount().get());
+  }
+
+  public void decrContainerStat(ContainerStat deltaStat) {
+    this.containerReportSize.incr(-1 * deltaStat.getSize().get());
+    this.containerReportUsed.incr(-1 * deltaStat.getUsed().get());
+    this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get());
+    this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get());
+    this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get());
+    this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get());
+    this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get());
+  }
+
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);

+ 16 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -43,7 +43,7 @@ import org.apache.hadoop.ozone.protocol
     .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.protocol
     .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
-
+import org.apache.hadoop.ozone.scm.StorageContainerManager;
 import org.apache.hadoop.ozone.scm.VersionInfo;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
@@ -121,7 +121,7 @@ public class SCMNodeManager
   private final AtomicInteger staleNodeCount;
   private final AtomicInteger deadNodeCount;
   private final AtomicInteger totalNodes;
-  private final long staleNodeIntervalMs;
+  private long staleNodeIntervalMs;
   private final long deadNodeIntervalMs;
   private final long heartbeatCheckerIntervalMs;
   private final long datanodeHBIntervalSeconds;
@@ -150,12 +150,13 @@ public class SCMNodeManager
 
   // Node pool manager.
   private final SCMNodePoolManager nodePoolManager;
+  private final StorageContainerManager scmManager;
 
   /**
    * Constructs SCM machine Manager.
    */
-  public SCMNodeManager(OzoneConfiguration conf, String clusterID)
-      throws IOException {
+  public SCMNodeManager(OzoneConfiguration conf, String clusterID,
+      StorageContainerManager scmManager) throws IOException {
     heartbeatQueue = new ConcurrentLinkedQueue<>();
     healthyNodes = new ConcurrentHashMap<>();
     deadNodes = new ConcurrentHashMap<>();
@@ -197,6 +198,7 @@ public class SCMNodeManager
     registerMXBean();
 
     this.nodePoolManager = new SCMNodePoolManager(conf);
+    this.scmManager = scmManager;
   }
 
   private void registerMXBean() {
@@ -551,6 +553,11 @@ public class SCMNodeManager
     healthyNodeCount.decrementAndGet();
     staleNodes.put(entry.getKey(), entry.getValue());
     staleNodeCount.incrementAndGet();
+
+    if (scmManager != null) {
+      // remove stale node's container report
+      scmManager.removeContainerReport(entry.getKey());
+    }
   }
 
   /**
@@ -863,4 +870,9 @@ public class SCMNodeManager
   public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
     this.commandQueue.addCommand(id, command);
   }
+
+  @VisibleForTesting
+  public void setStaleNodeIntervalMs(long interval) {
+    this.staleNodeIntervalMs = interval;
+  }
 }

+ 14 - 7
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md

@@ -110,13 +110,20 @@ Following are the counters for containers:
 
 | Name | Description |
 |:---- |:---- |
-| `LastContainerReportSize` | Total size in bytes of all containers |
-| `LastContainerReportUsed` | Total number of bytes used by all containers |
-| `LastContainerReportKeyCount` | Total number of keys in all containers |
-| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers |
-| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers |
-| `LastContainerReportReadCount` | Total number of times containers have been read from |
-| `LastContainerReportWriteCount` | Total number of times containers have been written to |
+| `LastContainerReportSize` | Total size in bytes of all containers in latest container report that SCM received from datanode |
+| `LastContainerReportUsed` | Total number of bytes used by all containers in latest container report that SCM received from datanode |
+| `LastContainerReportKeyCount` | Total number of keys in all containers in latest container report that SCM received from datanode |
+| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers in latest container report that SCM received from datanode |
+| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers in latest container report that SCM received from datanode |
+| `LastContainerReportReadCount` | Total number of times containers have been read from in latest container report that SCM received from datanode |
+| `LastContainerReportWriteCount` | Total number of times containers have been written to in latest container report that SCM received from datanode |
+| `ContainerReportSize` | Total size in bytes of all containers over whole cluster |
+| `ContainerReportUsed` | Total number of bytes used by all containers over whole cluster |
+| `ContainerReportKeyCount` | Total number of keys in all containers over whole cluster |
+| `ContainerReportReadBytes` | Total number of bytes have been read from all containers over whole cluster |
+| `ContainerReportWriteBytes` | Total number of bytes have been written into all containers over whole cluster |
+| `ContainerReportReadCount` | Total number of times containers have been read from over whole cluster |
+| `ContainerReportWriteCount` | Total number of times containers have been written to over whole cluster |
 
 ### Key Space Metrics
 

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
 import javax.management.openmbean.CompositeData;
@@ -91,6 +93,24 @@ public class TestSCMMXBean {
     String clientRpcPort = (String)mbs.getAttribute(bean,
         "ClientRpcPort");
     assertEquals(scm.getClientRpcPort(), clientRpcPort);
+
+    ConcurrentMap<String, ContainerStat> map = scm.getContainerReportCache();
+    ContainerStat stat = new ContainerStat(1, 2, 3, 4, 5, 6, 7);
+    map.put("nodeID", stat);
+    TabularData data = (TabularData) mbs.getAttribute(
+        bean, "ContainerReport");
+
+    // verify report info
+    assertEquals(1, data.values().size());
+    for (Object obj : data.values()) {
+      assertTrue(obj instanceof CompositeData);
+      CompositeData d = (CompositeData) obj;
+      Iterator<?> it = d.values().iterator();
+      String key = it.next().toString();
+      String value = it.next().toString();
+      assertEquals("nodeID", key);
+      assertEquals(stat.toJsonString(), value);
+    }
   }
 
   @Test

+ 140 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.scm;
 
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -25,9 +26,10 @@ import java.util.UUID;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
@@ -35,13 +37,23 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics;
+import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 /**
  * This class tests the metrics of Storage Container Manager.
  */
 public class TestSCMMetrics {
-  private static MiniOzoneCluster cluster = null;
+  /**
+   * Set the timeout for each test.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(90000);
+
+  private static MiniOzoneClassicCluster cluster = null;
 
   @Test
   public void testContainerMetrics() throws Exception {
@@ -64,7 +76,11 @@ public class TestSCMMetrics {
       ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
           writeBytes, readCount, writeCount);
       StorageContainerManager scmManager = cluster.getStorageContainerManager();
-      scmManager.sendContainerReport(createContainerReport(numReport, stat));
+
+      ContainerReportsRequestProto request = createContainerReport(numReport,
+          stat, null);
+      String fstDatanodeID = request.getDatanodeID().getDatanodeUuid();
+      scmManager.sendContainerReport(request);
 
       // verify container stat metrics
       MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -83,6 +99,117 @@ public class TestSCMMetrics {
           getLongGauge("LastContainerReportReadCount", scmMetrics));
       assertEquals(writeCount * numReport,
           getLongGauge("LastContainerReportWriteCount", scmMetrics));
+
+      // add one new report
+      request = createContainerReport(1, stat, null);
+      String sndDatanodeID = request.getDatanodeID().getDatanodeUuid();
+      scmManager.sendContainerReport(request);
+
+      scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
+      assertEquals(size * (numReport + 1),
+          getLongCounter("ContainerReportSize", scmMetrics));
+      assertEquals(used * (numReport + 1),
+          getLongCounter("ContainerReportUsed", scmMetrics));
+      assertEquals(readBytes * (numReport + 1),
+          getLongCounter("ContainerReportReadBytes", scmMetrics));
+      assertEquals(writeBytes * (numReport + 1),
+          getLongCounter("ContainerReportWriteBytes", scmMetrics));
+
+      assertEquals(keyCount * (numReport + 1),
+          getLongCounter("ContainerReportKeyCount", scmMetrics));
+      assertEquals(readCount * (numReport + 1),
+          getLongCounter("ContainerReportReadCount", scmMetrics));
+      assertEquals(writeCount * (numReport + 1),
+          getLongCounter("ContainerReportWriteCount", scmMetrics));
+
+      // Re-send reports but with different value for validating
+      // the aggregation.
+      stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
+      scmManager.sendContainerReport(createContainerReport(1, stat,
+          fstDatanodeID));
+
+      stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
+      scmManager.sendContainerReport(createContainerReport(1, stat,
+          sndDatanodeID));
+
+      // the global container metrics value should be updated
+      scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
+      assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics));
+      assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics));
+      assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics));
+      assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics));
+
+      assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics));
+      assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics));
+      assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testStaleNodeContainerReport() throws Exception {
+    int nodeCount = 2;
+    int numReport = 2;
+    long size = OzoneConsts.GB * 5;
+    long used = OzoneConsts.GB * 2;
+    long readBytes = OzoneConsts.GB * 1;
+    long writeBytes = OzoneConsts.GB * 2;
+    int keyCount = 1000;
+    int readCount = 100;
+    int writeCount = 50;
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    try {
+      cluster = new MiniOzoneClassicCluster.Builder(conf)
+          .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+          .numDataNodes(nodeCount).build();
+
+      ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
+          writeBytes, readCount, writeCount);
+      StorageContainerManager scmManager = cluster.getStorageContainerManager();
+
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      String datanodeUuid = dataNode.getDatanodeId().getDatanodeUuid();
+      ContainerReportsRequestProto request = createContainerReport(numReport,
+          stat, datanodeUuid);
+      scmManager.sendContainerReport(request);
+
+      MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
+      assertEquals(size * numReport,
+          getLongCounter("ContainerReportSize", scmMetrics));
+      assertEquals(used * numReport,
+          getLongCounter("ContainerReportUsed", scmMetrics));
+      assertEquals(readBytes * numReport,
+          getLongCounter("ContainerReportReadBytes", scmMetrics));
+      assertEquals(writeBytes * numReport,
+          getLongCounter("ContainerReportWriteBytes", scmMetrics));
+
+      assertEquals(keyCount * numReport,
+          getLongCounter("ContainerReportKeyCount", scmMetrics));
+      assertEquals(readCount * numReport,
+          getLongCounter("ContainerReportReadCount", scmMetrics));
+      assertEquals(writeCount * numReport,
+          getLongCounter("ContainerReportWriteCount", scmMetrics));
+
+      // reset stale interval time to move node from healthy to stale
+      SCMNodeManager nodeManager = (SCMNodeManager) cluster
+          .getStorageContainerManager().getScmNodeManager();
+      nodeManager.setStaleNodeIntervalMs(100);
+
+      // verify the metrics when node becomes stale
+      GenericTestUtils.waitFor(() -> {
+        MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME);
+        return 0 == getLongCounter("ContainerReportSize", metrics)
+            && 0 == getLongCounter("ContainerReportUsed", metrics)
+            && 0 == getLongCounter("ContainerReportReadBytes", metrics)
+            && 0 == getLongCounter("ContainerReportWriteBytes", metrics)
+            && 0 == getLongCounter("ContainerReportKeyCount", metrics)
+            && 0 == getLongCounter("ContainerReportReadCount", metrics)
+            && 0 == getLongCounter("ContainerReportWriteCount", metrics);
+      }, 1000, 60000);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -91,7 +218,7 @@ public class TestSCMMetrics {
   }
 
   private ContainerReportsRequestProto createContainerReport(int numReport,
-      ContainerStat stat) {
+      ContainerStat stat, String datanodeUuid) {
     StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
         reportsBuilder = StorageContainerDatanodeProtocolProtos
         .ContainerReportsRequestProto.newBuilder();
@@ -108,8 +235,15 @@ public class TestSCMMetrics {
       report.setWriteBytes(stat.getWriteBytes().get());
       reportsBuilder.addReports(report.getProtoBufMessage());
     }
-    reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
-        .getProtoBufMessage());
+
+    DatanodeID datanodeID;
+    if (datanodeUuid == null) {
+      datanodeID = SCMTestUtils.getDatanodeID();
+    } else {
+      datanodeID = new DatanodeID("null", "null", datanodeUuid, 0, 0, 0, 0);
+    }
+
+    reportsBuilder.setDatanodeID(datanodeID.getProtoBufMessage());
     reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
         .ContainerReportsRequestProto.reportType.fullReport);
     return reportsBuilder.build();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java

@@ -93,7 +93,7 @@ public class TestContainerPlacement {
   SCMNodeManager createNodeManager(OzoneConfiguration config)
       throws IOException {
     SCMNodeManager nodeManager = new SCMNodeManager(config,
-        UUID.randomUUID().toString());
+        UUID.randomUUID().toString(), null);
     assertFalse("Node manager should be in chill mode",
         nodeManager.isOutOfChillMode());
     return nodeManager;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -125,7 +125,7 @@ public class TestNodeManager {
   SCMNodeManager createNodeManager(OzoneConfiguration config)
       throws IOException {
     SCMNodeManager nodeManager = new SCMNodeManager(config,
-        UUID.randomUUID().toString());
+        UUID.randomUUID().toString(), null);
     assertFalse("Node manager should be in chill mode",
         nodeManager.isOutOfChillMode());
     return nodeManager;