소스 검색

HDFS-11447. Ozone: SCM: Send node report to SCM with heartbeat.Contributed by Xiaoyu Yao.

Anu Engineer 8 년 전
부모
커밋
56f011fd02
25개의 변경된 파일941개의 추가작업 그리고 76개의 파일을 삭제
  1. 26 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
  2. 187 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
  3. 63 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java
  4. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java
  5. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
  6. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java
  7. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  8. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  9. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
  10. 19 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  11. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  12. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  13. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
  14. 8 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
  15. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
  16. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
  17. 98 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java
  18. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
  19. 106 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
  20. 94 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java
  21. 17 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
  22. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
  23. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
  24. 25 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
  25. 190 31
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

+ 26 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java

@@ -24,6 +24,10 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -120,7 +124,7 @@ public class ContainerManagerImpl implements ContainerManager {
         dataDirs.add(location);
       }
       this.locationManager =
-          new ContainerLocationManagerImpl(containerDirs, dataDirs);
+          new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
 
     } finally {
       readUnlock();
@@ -395,9 +399,10 @@ public class ContainerManagerImpl implements ContainerManager {
    * @throws IOException
    */
   @Override
-  public void shutdown() {
+  public void shutdown() throws IOException {
     Preconditions.checkState(this.hasWriteLock());
     this.containerMap.clear();
+    this.locationManager.shutdown();
   }
 
 
@@ -497,6 +502,25 @@ public class ContainerManagerImpl implements ContainerManager {
     return this.keyManager;
   }
 
+  /**
+   * Get the node report.
+   * @return node report.
+   */
+  @Override
+  public SCMNodeReport getNodeReport() throws IOException {
+    StorageLocationReport[] reports = locationManager.getLocationReport();
+    SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+    for (int i = 0; i < reports.length; i++) {
+      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
+          .setCapacity(reports[i].getCapacity())
+          .setScmUsed(reports[i].getScmUsed())
+          .setRemaining(reports[i].getRemaining())
+          .build());
+    }
+    return nrb.build();
+  }
+
   /**
    * Filter out only container files from the container metadata dir.
    */

+ 187 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java

@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
+
+import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
+
+/**
+ * Class that wraps the space usage of the Datanode Container Storage Location
+ * by SCM containers.
+ */
+public class ContainerStorageLocation {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStorageLocation.class);
+
+  private static final String DU_CACHE_FILE = "scmUsed";
+  private volatile boolean scmUsedSaved = false;
+
+  private final StorageLocation dataLocation;
+  private final String storageUuId;
+  private final DF usage;
+  private final GetSpaceUsed scmUsage;
+  private final File scmUsedFile;
+
+  public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
+      throws IOException {
+    this.dataLocation = dataLoc;
+    this.storageUuId = DatanodeStorage.generateUuid();
+    File dataDir = new File(dataLoc.getNormalizedUri().getPath());
+    scmUsedFile = new File(dataDir, DU_CACHE_FILE);
+    // get overall disk usage
+    this.usage = new DF(dataDir, conf);
+    // get SCM specific usage
+    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
+        .setConf(conf)
+        .setInitialUsed(loadScmUsed())
+        .build();
+
+    // Ensure scm usage is saved during shutdown.
+    ShutdownHookManager.get().addShutdownHook(
+        new Runnable() {
+          @Override
+          public void run() {
+            if (!scmUsedSaved) {
+              saveScmUsed();
+            }
+          }
+        }, SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  public URI getNormalizedUri() {
+    return dataLocation.getNormalizedUri();
+  }
+
+  public String getStorageUuId() {
+    return storageUuId;
+  }
+  public long getCapacity() {
+    long capacity = usage.getCapacity();
+    return (capacity > 0) ? capacity : 0;
+  }
+
+  public long getAvailable() throws IOException {
+    long remaining = getCapacity() - getScmUsed();
+    long available = usage.getAvailable();
+    if (remaining > available) {
+      remaining = available;
+    }
+    return (remaining > 0) ? remaining : 0;
+  }
+
+  public long getScmUsed() throws IOException{
+    return scmUsage.getUsed();
+  }
+
+  public void shutdown() {
+    saveScmUsed();
+    scmUsedSaved = true;
+
+    if (scmUsage instanceof CachingGetSpaceUsed) {
+      IOUtils.cleanup(null, ((CachingGetSpaceUsed) scmUsage));
+    }
+  }
+
+  /**
+   * Read in the cached DU value and return it if it is less than 600 seconds
+   * old (DU update interval). Slight imprecision of scmUsed is not critical
+   * and skipping DU can significantly shorten the startup time.
+   * If the cached value is not available or too old, -1 is returned.
+   */
+  long loadScmUsed() {
+    long cachedScmUsed;
+    long mtime;
+    Scanner sc;
+
+    try {
+      sc = new Scanner(scmUsedFile, "UTF-8");
+    } catch (FileNotFoundException fnfe) {
+      return -1;
+    }
+
+    try {
+      // Get the recorded scmUsed from the file.
+      if (sc.hasNextLong()) {
+        cachedScmUsed = sc.nextLong();
+      } else {
+        return -1;
+      }
+      // Get the recorded mtime from the file.
+      if (sc.hasNextLong()) {
+        mtime = sc.nextLong();
+      } else {
+        return -1;
+      }
+
+      // Return the cached value if mtime is okay.
+      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+        LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
+            cachedScmUsed);
+        return cachedScmUsed;
+      }
+      return -1;
+    } finally {
+      sc.close();
+    }
+  }
+
+  /**
+   * Write the current scmUsed to the cache file.
+   */
+  void saveScmUsed() {
+    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
+      LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
+    }
+    OutputStreamWriter out = null;
+    try {
+      long used = getScmUsed();
+      if (used > 0) {
+        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
+            StandardCharsets.UTF_8);
+        // mtime is written last, so that truncated writes won't be valid.
+        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException ioe) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error and continue.
+      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+}

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+/**
+ * Storage location stats of datanodes that provide back store for containers.
+ *
+ */
+public class StorageLocationReport {
+  public static final StorageLocationReport[] EMPTY_ARRAY = {};
+
+  private final String id;
+  private final boolean failed;
+  private final long capacity;
+  private final long scmUsed;
+  private final long remaining;
+
+  public StorageLocationReport(String id, boolean failed,
+      long capacity, long scmUsed, long remaining) {
+    this.id = id;
+    this.failed = failed;
+    this.capacity = capacity;
+    this.scmUsed = scmUsed;
+    this.remaining = remaining;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
+  public long getCapacity() {
+    return capacity;
+  }
+
+  public long getScmUsed() {
+    return scmUsed;
+  }
+
+  public long getRemaining() {
+    return remaining;
+  }
+
+}

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerLocationManager.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.container.common.interfaces;
 
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+
 import java.io.IOException;
 import java.nio.file.Path;
 
@@ -41,4 +43,16 @@ public interface ContainerLocationManager {
    */
   Path getDataPath(String containerName) throws IOException;
 
+  /**
+   * Returns an array of storage location usage report.
+   * @return storage location usage report.
+   */
+  StorageLocationReport[] getLocationReport() throws IOException;
+
+  /**
+   * Supports clean shutdown of container.
+   *
+   * @throws IOException
+   */
+  void shutdown() throws IOException;
 }

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
@@ -93,7 +95,7 @@ public interface ContainerManager extends RwLock {
    *
    * @throws IOException
    */
-  void shutdown();
+  void shutdown() throws IOException;
 
   /**
    * Sets the Chunk Manager.
@@ -123,4 +125,9 @@ public interface ContainerManager extends RwLock {
    */
   KeyManager getKeyManager();
 
+  /**
+   * Get the Node Report of container storage usage.
+   * @return node report.
+   */
+  SCMNodeReport getNodeReport() throws IOException;
 }

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/package-info.java

@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+/**
+ This package contains common ozone container interfaces.
+ */

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -82,6 +82,7 @@ public class DatanodeStateMachine implements Closeable {
       try {
         LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
         nextHB = Time.monotonicNow() + heartbeatFrequency;
+        context.setReportState(container.getNodeReport());
         context.execute(executorService, heartbeatFrequency,
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -43,6 +44,7 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
   private DatanodeStateMachine.DatanodeStates state;
+  private SCMNodeReport nrState;
 
   /**
    * Constructs a StateContext.
@@ -59,6 +61,7 @@ public class StateContext {
     commandQueue = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
+    nrState = SCMNodeReport.getDefaultInstance();
   }
 
   /**
@@ -111,6 +114,22 @@ public class StateContext {
     this.state = state;
   }
 
+  /**
+   * Returns the node report of the datanode state context.
+   * @return the node report.
+   */
+  public SCMNodeReport getNodeReport() {
+    return nrState;
+  }
+
+  /**
+   * Sets the storage location report of the datanode state context.
+   * @param nrReport - node report
+   */
+  public void setReportState(SCMNodeReport nrReport) {
+    this.nrState = nrReport;
+  }
+
   /**
    * Returns the next task to get executed by the datanode state machine.
    * @return A callable that will be executed by the

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java

@@ -231,6 +231,7 @@ public class RunningDatanodeState implements DatanodeState {
           .setConfig(conf)
           .setEndpointStateMachine(endpoint)
           .setNodeID(getContainerNodeID())
+          .setContext(context)
           .build();
     case SHUTDOWN:
       break;

+ 19 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
 import org.slf4j.Logger;
@@ -41,6 +42,7 @@ public class HeartbeatEndpointTask
   private final EndpointStateMachine rpcEndpoint;
   private final Configuration conf;
   private ContainerNodeIDProto containerNodeIDProto;
+  private StateContext context;
 
   /**
    * Constructs a SCM heart beat.
@@ -48,9 +50,10 @@ public class HeartbeatEndpointTask
    * @param conf Config.
    */
   public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
-      Configuration conf) {
+      Configuration conf, StateContext context) {
     this.rpcEndpoint = rpcEndpoint;
     this.conf = conf;
+    this.context = context;
   }
 
   /**
@@ -85,8 +88,9 @@ public class HeartbeatEndpointTask
       Preconditions.checkState(this.containerNodeIDProto != null);
       DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
           .containerNodeIDProto.getDatanodeID());
-      // TODO : Add the command to command processor queue.
-      rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID);
+
+      rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID,
+          this.context.getNodeReport());
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
       rpcEndpoint.logIfNeeded(ex
@@ -112,6 +116,7 @@ public class HeartbeatEndpointTask
     private EndpointStateMachine endPointStateMachine;
     private Configuration conf;
     private ContainerNodeIDProto containerNodeIDProto;
+    private StateContext context;
 
     /**
      * Constructs the builder class.
@@ -152,6 +157,16 @@ public class HeartbeatEndpointTask
       return this;
     }
 
+    /**
+     * Sets the context.
+     * @param stateContext - State context.
+     * @return this.
+     */
+    public Builder setContext(StateContext stateContext) {
+      this.context = stateContext;
+      return this;
+    }
+
     public HeartbeatEndpointTask build() {
       if (endPointStateMachine == null) {
         LOG.error("No endpoint specified.");
@@ -172,10 +187,9 @@ public class HeartbeatEndpointTask
       }
 
       HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
-          .endPointStateMachine, this.conf);
+          .endPointStateMachine, this.conf, this.context);
       task.setContainerNodeIDProto(containerNodeIDProto);
       return task;
     }
-
   }
 }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +140,8 @@ public class OzoneContainer {
       this.keyManager.shutdown();
       this.manager.shutdown();
       LOG.info("container services shutdown complete.");
+    } catch (IOException ex) {
+      LOG.warn("container service shutdown error:", ex);
     } finally {
       this.manager.writeUnlock();
     }
@@ -155,4 +159,11 @@ public class OzoneContainer {
       pathList.add(location);
     }
   }
+
+  /**
+   * Returns node report of container storage usage.
+   */
+  public SCMNodeReport getNodeReport() throws IOException {
+    return this.manager.getNodeReport();
+  }
 }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 
 import java.io.IOException;
 
@@ -41,11 +42,12 @@ public interface StorageContainerDatanodeProtocol {
   /**
    * Used by data node to send a Heartbeat.
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report state
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
-  SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
-      throws IOException;
+  SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) throws IOException;
 
   /**
    * Register Datanode.

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 
 import java.util.List;
 
@@ -54,8 +55,10 @@ public interface StorageContainerNodeProtocol {
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report.
    * @return SCMheartbeat response list
    */
+  List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport);
 
-  List<SCMCommand> sendHeartbeat(DatanodeID datanodeID);
 }

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -113,15 +115,17 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
    * Send by datanode to SCM.
    *
    * @param datanodeID - DatanodeID
+   * @param nodeReport - node report
    * @throws IOException
    */
 
   @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
-      throws IOException {
-    SCMHeartbeatRequestProto.Builder req =
-        SCMHeartbeatRequestProto.newBuilder();
+  public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) throws IOException {
+    SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
+        .newBuilder();
     req.setDatanodeID(datanodeID.getProtoBufMessage());
+    req.setNodeReport(nodeReport);
     final SCMHeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -78,9 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       SCMHeartbeatRequestProto request) throws ServiceException {
     try {
       return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
-          .getDatanodeID()));
+          .getDatanodeID()), request.getNodeReport());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
-}
+}

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -393,9 +395,10 @@ public class StorageContainerManager
    * @throws IOException
    */
   @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
-      throws IOException {
-    List<SCMCommand> commands = getScmNodeManager().sendHeartbeat(datanodeID);
+  public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) throws IOException {
+    List<SCMCommand> commands =
+        getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
     List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();
     for (SCMCommand cmd : commands) {
       cmdReponses.add(getCommandResponse(cmd));

+ 98 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * This class represents the item in SCM heartbeat queue.
+ */
+public class HeartbeatQueueItem {
+  private DatanodeID datanodeID;
+  private long recvTimestamp;
+  private SCMNodeReport nodeReport;
+
+  /**
+   *
+   * @param datanodeID - datanode ID of the heartbeat.
+   * @param recvTimestamp - heartbeat receive timestamp.
+   * @param nodeReport - node report associated with the heartbeat if any.
+   */
+  HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp,
+                     SCMNodeReport nodeReport) {
+    this.datanodeID = datanodeID;
+    this.recvTimestamp = recvTimestamp;
+    this.nodeReport = nodeReport;
+  }
+
+  /**
+   * @return datanode ID.
+   */
+  public DatanodeID getDatanodeID() {
+    return datanodeID;
+  }
+
+  /**
+   * @return node report.
+   */
+  public SCMNodeReport getNodeReport() {
+    return nodeReport;
+  }
+
+  /**
+   * @return heartbeat receive timestamp.
+   */
+  public long getRecvTimestamp() {
+    return recvTimestamp;
+  }
+
+  /**
+   * Builder for HeartbeatQueueItem.
+   */
+  public static class Builder {
+    private DatanodeID datanodeID;
+    private SCMNodeReport nodeReport;
+    private long recvTimestamp = monotonicNow();
+
+    public Builder setDatanodeID(DatanodeID datanodeId) {
+      this.datanodeID = datanodeId;
+      return this;
+    }
+
+    public Builder setNodeReport(SCMNodeReport scmNodeReport) {
+      this.nodeReport = scmNodeReport;
+      return this;
+    }
+
+    @VisibleForTesting
+    public Builder setRecvTimestamp(long recvTime) {
+      this.recvTimestamp = recvTime;
+      return this;
+    }
+
+    public HeartbeatQueueItem build() {
+      return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport);
+    }
+  }
+}

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java

@@ -135,4 +135,16 @@ public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
     DEAD
   }
 
+  /**
+   * Returns the aggregated node stats.
+   * @return the aggregated node stats.
+   */
+  SCMNodeStat getStats();
+
+  /**
+   * Return a list of node stats.
+   * @return a list of individual node stats (live/stale but not dead).
+   */
+  List<SCMNodeStat> getNodeStats();
+
 }

+ 106 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -34,8 +35,13 @@ import org.apache.hadoop.ozone.protocol.proto
     .ErrorCode;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol
+    .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol
+    .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 
 import org.apache.hadoop.ozone.scm.VersionInfo;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +73,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
  * <p>
  * Each heartbeat that SCMNodeManager receives is  put into heartbeatQueue. The
  * worker thread wakes up and grabs that heartbeat from the queue. The worker
- * thread will lookup the healthynodes map and update the timestamp if the entry
+ * thread will lookup the healthynodes map and set the timestamp if the entry
  * is there. if not it will look up stale and deadnodes map.
  * <p>
  * The getNode(byState) functions make copy of node maps and then creates a list
@@ -85,14 +91,20 @@ public class SCMNodeManager
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeManager.class);
+
   /**
    * Key = NodeID, value = timestamp.
    */
   private final Map<String, Long> healthyNodes;
   private final Map<String, Long> staleNodes;
   private final Map<String, Long> deadNodes;
-  private final Queue<DatanodeID> heartbeatQueue;
+  private final Queue<HeartbeatQueueItem> heartbeatQueue;
   private final Map<String, DatanodeID> nodes;
+  // Individual live node stats
+  private final Map<String, SCMNodeStat> nodeStats;
+  // Aggregated node stats
+  private SCMNodeStat scmStat;
+  // TODO: expose nodeStats and scmStat as metrics
   private final AtomicInteger healthyNodeCount;
   private final AtomicInteger staleNodeCount;
   private final AtomicInteger deadNodeCount;
@@ -121,6 +133,8 @@ public class SCMNodeManager
     deadNodes = new ConcurrentHashMap<>();
     staleNodes = new ConcurrentHashMap<>();
     nodes = new HashMap<>();
+    nodeStats = new HashedMap();
+    scmStat = new SCMNodeStat();
 
     healthyNodeCount = new AtomicInteger(0);
     staleNodeCount = new AtomicInteger(0);
@@ -158,7 +172,7 @@ public class SCMNodeManager
    */
   @Override
   public void removeNode(DatanodeID node) throws UnregisteredNodeException {
-    // TODO : Fix me.
+    // TODO : Fix me when adding the SCM CLI.
 
   }
 
@@ -371,9 +385,9 @@ public class SCMNodeManager
     // Process the whole queue.
     while (!heartbeatQueue.isEmpty() &&
         (lastHBProcessedCount < maxHBToProcessPerLoop)) {
-      DatanodeID datanodeID = heartbeatQueue.poll();
+      HeartbeatQueueItem hbItem = heartbeatQueue.poll();
       synchronized (this) {
-        handleHeartbeat(datanodeID);
+        handleHeartbeat(hbItem);
       }
       // we are shutting down or something give up processing the rest of
       // HBs. This will terminate the HB processing thread.
@@ -439,7 +453,8 @@ public class SCMNodeManager
     // 4. And the most important reason, heartbeats are not blocked even if
     // this thread does not run, they will go into the processing queue.
 
-    if (!Thread.currentThread().isInterrupted()) {
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
       executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
           .MILLISECONDS);
     } else {
@@ -489,40 +504,85 @@ public class SCMNodeManager
     staleNodeCount.decrementAndGet();
     deadNodes.put(entry.getKey(), entry.getValue());
     deadNodeCount.incrementAndGet();
+
+    // Update SCM node stats
+    SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey());
+    scmStat.subtract(deadNodeStat);
+    nodeStats.remove(entry.getKey());
   }
 
   /**
    * Handles a single heartbeat from a datanode.
    *
-   * @param datanodeID - datanode ID.
+   * @param hbItem - heartbeat item from a datanode.
    */
-  private void handleHeartbeat(DatanodeID datanodeID) {
+  private void handleHeartbeat(HeartbeatQueueItem hbItem) {
     lastHBProcessedCount++;
 
+    String datanodeID = hbItem.getDatanodeID().getDatanodeUuid();
+    SCMNodeReport nodeReport = hbItem.getNodeReport();
+    long recvTimestamp = hbItem.getRecvTimestamp();
+    long processTimestamp = Time.monotonicNow();
+    if (LOG.isTraceEnabled()) {
+      //TODO: add average queue time of heartbeat request as metrics
+      LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
+          datanodeID, processTimestamp - recvTimestamp);
+    }
+
     // If this node is already in the list of known and healthy nodes
-    // just update the last timestamp and return.
-    if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) {
-      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+    // just set the last timestamp and return.
+    if (healthyNodes.containsKey(datanodeID)) {
+      healthyNodes.put(datanodeID, processTimestamp);
+      updateNodeStat(datanodeID, nodeReport);
       return;
     }
 
     // A stale node has heartbeat us we need to remove the node from stale
     // list and move to healthy list.
-    if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) {
-      staleNodes.remove(datanodeID.getDatanodeUuid());
-      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+    if (staleNodes.containsKey(datanodeID)) {
+      staleNodes.remove(datanodeID);
+      healthyNodes.put(datanodeID, processTimestamp);
       healthyNodeCount.incrementAndGet();
       staleNodeCount.decrementAndGet();
+      updateNodeStat(datanodeID, nodeReport);
       return;
     }
 
     // A dead node has heartbeat us, we need to remove that node from dead
     // node list and move it to the healthy list.
-    if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) {
-      deadNodes.remove(datanodeID.getDatanodeUuid());
-      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+    if (deadNodes.containsKey(datanodeID)) {
+      deadNodes.remove(datanodeID);
+      healthyNodes.put(datanodeID, processTimestamp);
       deadNodeCount.decrementAndGet();
       healthyNodeCount.incrementAndGet();
+      updateNodeStat(datanodeID, nodeReport);
+      return;
+    }
+    LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
+  }
+
+  private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
+    SCMNodeStat stat = nodeStats.get(datanodeID);
+    if (stat == null) {
+      LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
+          "dead datanode {}", datanodeID);
+      stat = new SCMNodeStat();
+    }
+
+    if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
+      long totalCapacity = 0;
+      long totalRemaining = 0;
+      long totalScmUsed = 0;
+      List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
+      for (SCMStorageReport report : storageReports) {
+        totalCapacity += report.getCapacity();
+        totalRemaining += report.getRemaining();
+        totalScmUsed += report.getScmUsed();
+      }
+      scmStat.subtract(stat);
+      stat.set(totalCapacity, totalScmUsed, totalRemaining);
+      nodeStats.put(datanodeID, stat);
+      scmStat.add(stat);
     }
   }
 
@@ -591,6 +651,7 @@ public class SCMNodeManager
     totalNodes.incrementAndGet();
     healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
     healthyNodeCount.incrementAndGet();
+    nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat());
     LOG.info("Data node with ID: {} Registered.",
         datanodeID.getDatanodeUuid());
     return RegisteredCommand.newBuilder()
@@ -625,23 +686,47 @@ public class SCMNodeManager
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report.
    * @return SCMheartbeat response.
    * @throws IOException
    */
   @Override
-  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
+  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) {
 
     // Checking for NULL to make sure that we don't get
     // an exception from ConcurrentList.
     // This could be a problem in tests, if this function is invoked via
     // protobuf, transport layer will guarantee that this is not null.
     if (datanodeID != null) {
-      heartbeatQueue.add(datanodeID);
-
+      heartbeatQueue.add(
+          new HeartbeatQueueItem.Builder()
+              .setDatanodeID(datanodeID)
+              .setNodeReport(nodeReport)
+              .build());
     } else {
       LOG.error("Datanode ID in heartbeat is null");
     }
 
     return commandQueue.getCommand(datanodeID);
   }
-}
+
+  /**
+   * Returns the aggregated node stats.
+   * @return the aggregated node stats.
+   */
+  @Override
+  public SCMNodeStat getStats() {
+    return new SCMNodeStat(this.scmStat);
+  }
+
+  /**
+   * Return a list of node stats.
+   * @return a list of individual node stats (live/stale but not dead).
+   */
+  @Override
+  public List<SCMNodeStat> getNodeStats(){
+    return nodeStats.entrySet().stream().map(
+        entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList());
+  }
+}

+ 94 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class represents the SCM node stat.
+ */
+public class SCMNodeStat {
+  private long capacity;
+  private long scmUsed;
+  private long remaining;
+
+  public SCMNodeStat() {
+  }
+
+  public SCMNodeStat(SCMNodeStat other) {
+    set(other.capacity, other.scmUsed, other.remaining);
+  }
+
+  /**
+   * @return the total configured capacity of the node.
+   */
+  public long getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * @return the total SCM used space on the node.
+   */
+  public long getScmUsed() {
+    return scmUsed;
+  }
+
+  /**
+   * @return the total remaining space available on the node.
+   */
+  public long getRemaining() {
+    return remaining;
+  }
+
+  @VisibleForTesting
+  public void set(long total, long used, long remain) {
+    this.capacity = total;
+    this.scmUsed = used;
+    this.remaining = remain;
+  }
+
+  public SCMNodeStat add(SCMNodeStat stat) {
+    this.capacity += stat.getCapacity();
+    this.scmUsed += stat.getScmUsed();
+    this.remaining += stat.getRemaining();
+    return this;
+  }
+
+  public SCMNodeStat subtract(SCMNodeStat stat) {
+    this.capacity -= stat.getCapacity();
+    this.scmUsed -= stat.getScmUsed();
+    this.remaining -= stat.getRemaining();
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object to) {
+    return this == to ||
+        (to instanceof SCMNodeStat &&
+            capacity == ((SCMNodeStat) to).getCapacity() &&
+            scmUsed == ((SCMNodeStat) to).getScmUsed() &&
+            remaining == ((SCMNodeStat) to).getRemaining());
+  }
+
+  @Override
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 42; // any arbitrary constant will do
+  }
+}

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -47,6 +47,23 @@ import "DatanodeContainerProtocol.proto";
 */
 message SCMHeartbeatRequestProto {
   required DatanodeIDProto datanodeID = 1;
+  optional SCMNodeReport nodeReport= 2;
+}
+
+/**
+* This message is send along with the heart beat to report datanode
+* storage utilization by SCM.
+*/
+message SCMNodeReport {
+  repeated SCMStorageReport storageReport= 1;
+}
+
+message SCMStorageReport {
+  required string storageUuid = 1;
+  optional uint64 capacity = 2 [default = 0];
+  optional uint64 scmUsed = 3 [default = 0];
+  optional uint64 remaining = 4 [default = 0];
+  optional StorageTypeProto storageType = 5 [default = DISK];
 }
 
 message SCMRegisterRequestProto {

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java

@@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.NullCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.scm.VersionInfo;
 
 import java.io.IOException;
@@ -104,12 +106,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
    * Used by data node to send a Heartbeat.
    *
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report.
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
   @Override
   public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
-      sendHeartbeat(DatanodeID datanodeID)
+      sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport)
       throws IOException {
     rpcCount.incrementAndGet();
     heartbeatCount.incrementAndGet();

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -237,7 +237,7 @@ public class TestEndPoint {
              SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
                  serverAddress, 1000)) {
       SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
-          .sendHeartbeat(dataNode);
+          .sendHeartbeat(dataNode, null);
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(1, responseProto.getCommandsCount());
       Assert.assertNotNull(responseProto.getCommandsList().get(0));
@@ -257,7 +257,7 @@ public class TestEndPoint {
         .build();
     rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
     HeartbeatEndpointTask endpointTask =
-        new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+        new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null);
     endpointTask.setContainerNodeIDProto(containerNodeID);
     endpointTask.call();
     Assert.assertNotNull(endpointTask.getContainerNodeIDProto());

+ 25 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java

@@ -23,7 +23,11 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+
 import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -172,6 +176,24 @@ public class MockNodeManager implements NodeManager {
     return false;
   }
 
+  /**
+   * Returns the aggregated node stats.
+   * @return the aggregated node stats.
+   */
+  @Override
+  public SCMNodeStat getStats() {
+    return null;
+  }
+
+  /**
+   * Return a list of node stats.
+   * @return a list of individual node stats (live/stale but not dead).
+   */
+  @Override
+  public List<SCMNodeStat> getNodeStats() {
+    return null;
+  }
+
   /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.
@@ -233,10 +255,12 @@ public class MockNodeManager implements NodeManager {
    * Send heartbeat to indicate the datanode is alive and doing well.
    *
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report.
    * @return SCMheartbeat response list
    */
   @Override
-  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
+  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) {
     return null;
   }
 }

+ 190 - 31
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -22,6 +22,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -103,7 +107,7 @@ public class TestNodeManager {
       // Send some heartbeats from different nodes.
       for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
         DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
-        nodeManager.sendHeartbeat(datanodeID);
+        nodeManager.sendHeartbeat(datanodeID, null);
       }
 
       // Wait for 4 seconds max.
@@ -149,7 +153,7 @@ public class TestNodeManager {
 
       // Need 100 nodes to come out of chill mode, only one node is sending HB.
       nodeManager.setMinimumChillModeNodes(100);
-      nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager));
+      nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null);
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
           4 * 1000);
       assertFalse("Not enough heartbeat, Node manager should have been in " +
@@ -175,7 +179,7 @@ public class TestNodeManager {
 
       // Send 10 heartbeat from same node, and assert we never leave chill mode.
       for (int x = 0; x < 10; x++) {
-        nodeManager.sendHeartbeat(datanodeID);
+        nodeManager.sendHeartbeat(datanodeID, null);
       }
 
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@@ -204,7 +208,7 @@ public class TestNodeManager {
     nodeManager.close();
 
     // These should never be processed.
-    nodeManager.sendHeartbeat(datanodeID);
+    nodeManager.sendHeartbeat(datanodeID, null);
 
     // Let us just wait for 2 seconds to prove that HBs are not processed.
     Thread.sleep(2 * 1000);
@@ -231,7 +235,7 @@ public class TestNodeManager {
 
       for (int x = 0; x < count; x++) {
         DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
-        nodeManager.sendHeartbeat(datanodeID);
+        nodeManager.sendHeartbeat(datanodeID, null);
       }
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
           4 * 1000);
@@ -317,14 +321,18 @@ public class TestNodeManager {
       DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager);
 
       // Heartbeat once
-      nodeManager.sendHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(staleNode, null);
 
       // Heartbeat all other nodes.
-      nodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : nodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
-      nodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : nodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
 
       // Wait for 2 more seconds, 3 seconds is the stale window for this test
       Thread.sleep(2 * 1000);
@@ -367,19 +375,25 @@ public class TestNodeManager {
       DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager);
 
       // Heartbeat once
-      nodeManager.sendHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(deadNode, null);
 
       // Heartbeat all other nodes.
-      nodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : nodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
 
-      nodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : nodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
       Thread.sleep(3 * 1000);
 
       // heartbeat good nodes again.
-      nodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : nodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
 
       //  6 seconds is the dead window for this test , so we wait a total of
       // 7 seconds to make sure that the node moves into dead state.
@@ -408,7 +422,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       GenericTestUtils.LogCapturer logCapturer =
           GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
-      nodeManager.sendHeartbeat(null);
+      nodeManager.sendHeartbeat(null, null);
       logCapturer.stopCapturing();
       assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
           "heartbeat is null"));
@@ -484,9 +498,9 @@ public class TestNodeManager {
           SCMTestUtils.getDatanodeID(nodeManager, "StaleNode");
       DatanodeID deadNode =
           SCMTestUtils.getDatanodeID(nodeManager, "DeadNode");
-      nodeManager.sendHeartbeat(healthyNode);
-      nodeManager.sendHeartbeat(staleNode);
-      nodeManager.sendHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
+      nodeManager.sendHeartbeat(staleNode, null);
+      nodeManager.sendHeartbeat(deadNode, null);
 
       // Sleep so that heartbeat processing thread gets to run.
       Thread.sleep(500);
@@ -512,12 +526,12 @@ public class TestNodeManager {
        * the 3 second windows.
        */
 
-      nodeManager.sendHeartbeat(healthyNode);
-      nodeManager.sendHeartbeat(staleNode);
-      nodeManager.sendHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
+      nodeManager.sendHeartbeat(staleNode, null);
+      nodeManager.sendHeartbeat(deadNode, null);
 
       Thread.sleep(1500);
-      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
       Thread.sleep(2 * 1000);
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
 
@@ -537,10 +551,10 @@ public class TestNodeManager {
        * staleNode to move to stale state and deadNode to move to dead state.
        */
 
-      nodeManager.sendHeartbeat(healthyNode);
-      nodeManager.sendHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
+      nodeManager.sendHeartbeat(staleNode, null);
       Thread.sleep(1500);
-      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
       Thread.sleep(2 * 1000);
 
       // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -570,9 +584,9 @@ public class TestNodeManager {
        * Cluster State : let us heartbeat all the nodes and verify that we get
        * back all the nodes in healthy state.
        */
-      nodeManager.sendHeartbeat(healthyNode);
-      nodeManager.sendHeartbeat(staleNode);
-      nodeManager.sendHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(healthyNode, null);
+      nodeManager.sendHeartbeat(staleNode, null);
+      nodeManager.sendHeartbeat(deadNode, null);
       Thread.sleep(500);
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
@@ -591,8 +605,9 @@ public class TestNodeManager {
   private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
                                 int sleepDuration) throws InterruptedException {
     while (!Thread.currentThread().isInterrupted()) {
-      list.forEach(manager::sendHeartbeat);
-      Thread.sleep(sleepDuration);
+      for (DatanodeID dn : list) {
+        manager.sendHeartbeat(dn, null);
+      }      Thread.sleep(sleepDuration);
     }
   }
 
@@ -676,7 +691,10 @@ public class TestNodeManager {
 
       // No Thread just one time HBs the node manager, so that these will be
       // marked as dead nodes eventually.
-      deadNodeList.forEach(nodeManager::sendHeartbeat);
+      for (DatanodeID dn : deadNodeList) {
+        nodeManager.sendHeartbeat(dn, null);
+      }
+
 
       Thread thread1 = new Thread(healthyNodeTask);
       thread1.setDaemon(true);
@@ -828,7 +846,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       nodeManager.setMinimumChillModeNodes(10);
       DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
-      nodeManager.sendHeartbeat(datanodeID);
+      nodeManager.sendHeartbeat(datanodeID, null);
       String status = nodeManager.getChillModeStatus();
       Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
           "mode. Waiting on nodes to report in."));
@@ -858,7 +876,7 @@ public class TestNodeManager {
       // Assert that node manager force enter cannot be overridden by nodes HBs.
       for(int x= 0; x < 20; x++) {
         DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager);
-        nodeManager.sendHeartbeat(datanode);
+        nodeManager.sendHeartbeat(datanode, null);
       }
 
       Thread.sleep(500);
@@ -873,6 +891,147 @@ public class TestNodeManager {
           CoreMatchers.containsString("Out of chill mode."));
       assertFalse(nodeManager.isInManualChillMode());
     }
+  }
+
+  /**
+   * Test multiple nodes sending initial heartbeat with their node report.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmStatsFromNodeReport() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int nodeCount = 10;
+    final long capacity = 2000;
+    final long used = 100;
+    final long remaining = capacity - used;
 
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      for (int x = 0; x < nodeCount; x++) {
+        DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
+
+        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+        srb.setStorageUuid(UUID.randomUUID().toString());
+        srb.setCapacity(capacity).setScmUsed(used).
+            setRemaining(capacity - used).build();
+        nodeManager.sendHeartbeat(datanodeID,
+            nrb.addStorageReport(srb).build());
+      }
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(capacity * nodeCount,
+          nodeManager.getStats().getCapacity());
+      assertEquals(used * nodeCount,
+          nodeManager.getStats().getScmUsed());
+      assertEquals(remaining * nodeCount,
+          nodeManager.getStats().getRemaining());
+    }
+  }
+
+  /**
+   * Test single node stat update based on nodereport from different heartbeat
+   * status (healthy, stale and dead).
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNodeReportUpdate() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int heartbeatCount = 5;
+    final int nodeCount = 1;
+    final int interval = 100;
+
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
+      final long capacity = 2000;
+      final long usedPerHeartbeat = 100;
+
+      for (int x = 0; x < heartbeatCount; x++) {
+        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+        srb.setStorageUuid(UUID.randomUUID().toString());
+        srb.setCapacity(capacity).setScmUsed(x * usedPerHeartbeat)
+            .setRemaining(capacity - x * usedPerHeartbeat).build();
+        nrb.addStorageReport(srb);
+
+        nodeManager.sendHeartbeat(datanodeID, nrb.build());
+        Thread.sleep(100);
+      }
+
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
+
+      final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount -1);
+      final long expectedRemaining = capacity -
+          usedPerHeartbeat * (heartbeatCount - 1);
+      assertEquals(capacity, nodeManager.getStats().getCapacity());
+      assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed());
+      assertEquals(expectedRemaining, nodeManager.getStats().getRemaining());
+
+      // Test NodeManager#getNodeStats
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+      assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+      assertEquals(expectedScmUsed,
+          nodeManager.getNodeStats().get(0).getScmUsed());
+      assertEquals(expectedRemaining,
+          nodeManager.getNodeStats().get(0).getRemaining());
+
+      // Wait up to 4s so that the node becomes stale
+      // Verify the usage info should be unchanged.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
+          4 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+      assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+      assertEquals(expectedScmUsed,
+          nodeManager.getNodeStats().get(0).getScmUsed());
+      assertEquals(expectedRemaining,
+          nodeManager.getNodeStats().get(0).getRemaining());
+
+      // Wait up to 3 more seconds so the node becomes dead
+      // Verify usage info should be updated.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100,
+          3 * 1000);
+
+      assertEquals(0, nodeManager.getNodeStats().size());
+      assertEquals(0, nodeManager.getStats().getCapacity());
+      assertEquals(0, nodeManager.getStats().getScmUsed());
+      assertEquals(0, nodeManager.getStats().getRemaining());
+
+      // Send a new report to bring the dead node back to healty
+      SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
+      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
+      srb.setStorageUuid(UUID.randomUUID().toString());
+      srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
+          .setRemaining(expectedRemaining).build();
+      nrb.addStorageReport(srb);
+      nodeManager.sendHeartbeat(datanodeID, nrb.build());
+
+      // Wait up to 5 seconds so that the dead node becomes healthy
+      // Verify usage info should be updated.
+      GenericTestUtils.waitFor(
+          () -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1,
+          100, 5 * 1000);
+      assertEquals(nodeCount, nodeManager.getNodeStats().size());
+      assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
+      assertEquals(expectedScmUsed,
+          nodeManager.getNodeStats().get(0).getScmUsed());
+      assertEquals(expectedRemaining,
+          nodeManager.getNodeStats().get(0).getRemaining());
+    }
   }
 }