Ver código fonte

HDFS-11451. Ozone: Add protobuf definitions for container reports. Contributed by Anu Engineer.

Anu Engineer 8 anos atrás
pai
commit
e37c5ed47d
14 arquivos alterados com 331 adições e 68 exclusões
  1. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
  2. 38 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  3. 35 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  4. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  5. 80 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java
  6. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
  7. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
  8. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
  9. 15 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
  10. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
  11. 89 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
  12. 11 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
  13. 12 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
  14. 31 21
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java

@@ -73,7 +73,7 @@ public  class SCMConnectionManager {
    *
    * @return - Return RPC timeout.
    */
-  public long getRpcTimeout() {
+  public int getRpcTimeout() {
     return rpcTimeout;
   }
 
@@ -128,7 +128,7 @@ public  class SCMConnectionManager {
       StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
           StorageContainerDatanodeProtocolPB.class, version,
           address, UserGroupInformation.getCurrentUser(), conf,
-          NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+          NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
 
       StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
           new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);

+ 38 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.apache.hadoop.ozone.container.common.states.datanode
-    .RunningDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -34,6 +34,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState.states
+    .noContainerReports;
+
 /**
  * Current Context of State Machine.
  */
@@ -45,6 +49,9 @@ public class StateContext {
   private final Configuration conf;
   private DatanodeStateMachine.DatanodeStates state;
   private SCMNodeReport nrState;
+  private ReportState  reportState;
+  private static final ReportState DEFAULT_REPORT_STATE =
+      ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
 
   /**
    * Constructs a StateContext.
@@ -174,6 +181,7 @@ public class StateContext {
       if (isExiting(newState)) {
         task.onExit();
       }
+      this.clearReportState();
       this.setState(newState);
     }
   }
@@ -215,4 +223,32 @@ public class StateContext {
   }
 
 
+  /**
+   * Gets the ReportState.
+   * @return ReportState.
+   */
+  public synchronized  ReportState getContainerReportState() {
+    if (reportState == null) {
+      return DEFAULT_REPORT_STATE;
+    }
+    return reportState;
+  }
+
+  /**
+   * Sets the ReportState.
+   * @param rState - ReportState.
+   */
+  public synchronized  void setContainerReportState(ReportState rState) {
+    this.reportState = rState;
+  }
+
+  /**
+   * Clears report state after it has been communicated.
+   */
+  public synchronized void clearReportState() {
+    if(reportState != null) {
+      setContainerReportState(null);
+    }
+  }
+
 }

+ 35 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -26,6 +26,13 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto
+     .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+     .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,12 +96,13 @@ public class HeartbeatEndpointTask
       DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
           .containerNodeIDProto.getDatanodeID());
 
-      rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID,
-          this.context.getNodeReport());
+      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
+          .sendHeartbeat(datanodeID, this.context.getNodeReport(),
+              this.context.getContainerReportState());
+      processResponse(reponse);
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
-      rpcEndpoint.logIfNeeded(ex
-      );
+      rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
     }
@@ -109,6 +117,27 @@ public class HeartbeatEndpointTask
     return new Builder();
   }
 
+  /**
+   * Add this command to command processing Queue.
+   *
+   * @param response - SCMHeartbeat response.
+   */
+  private void processResponse(SCMHeartbeatResponseProto response) {
+    for (SCMCommandResponseProto commandResponseProto : response
+        .getCommandsList()) {
+      if (commandResponseProto.getCmdType() ==
+          StorageContainerDatanodeProtocolProtos.Type.nullCmd) {
+        //this.context.addCommand(NullCommand.newBuilder().build());
+        LOG.debug("Discarding a null command from SCM.");
+      }
+      if (commandResponseProto.getCmdType() ==
+          StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
+        this.context.addCommand(SendContainerCommand.getFromProtobuf(
+            commandResponseProto.getSendReport()));
+      }
+    }
+  }
+
   /**
    * Builder class for HeartbeatEndpointTask.
    */
@@ -138,8 +167,8 @@ public class HeartbeatEndpointTask
     /**
      * Sets the Config.
      *
-     * @param config  - config
-     * @return  Builder
+     * @param config - config
+     * @return Builder
      */
     public Builder setConfig(Configuration config) {
       this.conf = config;

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
@@ -43,11 +44,12 @@ public interface StorageContainerDatanodeProtocol {
    * Used by data node to send a Heartbeat.
    * @param datanodeID - Datanode ID.
    * @param nodeReport - node report state
+   * @param reportState - container report state.
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
   SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
-      SCMNodeReport nodeReport) throws IOException;
+      SCMNodeReport nodeReport, ReportState reportState) throws IOException;
 
   /**
    * Register Datanode.

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
+
+/**
+ * Allows a Datanode to send in the container report.
+ */
+public class SendContainerCommand extends SCMCommand<SendContainerReportProto> {
+  /**
+   * Returns a NullCommand class from NullCommandResponse Proto.
+   * @param unused  - unused
+   * @return NullCommand
+   */
+  public static SendContainerCommand getFromProtobuf(
+      final SendContainerReportProto unused) {
+    return new SendContainerCommand();
+  }
+
+  /**
+   * returns a new builder.
+   * @return Builder
+   */
+  public static SendContainerCommand.Builder newBuilder() {
+    return new SendContainerCommand.Builder();
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getType() {
+    return Type.sendContainerReport;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  public byte[] getProtoBufMessage() {
+    return SendContainerReportProto.newBuilder().build().toByteArray();
+  }
+
+  /**
+   * A Builder class this is the standard pattern we are using for all commands.
+   */
+  public static class Builder {
+    /**
+     * Return a null command.
+     * @return - NullCommand.
+     */
+    public SendContainerCommand build() {
+      return new SendContainerCommand();
+    }
+  }
+}

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -23,6 +23,8 @@ import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -121,11 +123,12 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
 
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
-      SCMNodeReport nodeReport) throws IOException {
+      SCMNodeReport nodeReport, ReportState reportState) throws IOException {
     SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
         .newBuilder();
     req.setDatanodeID(datanodeID.getProtoBufMessage());
     req.setNodeReport(nodeReport);
+    req.setContainerReportState(reportState);
     final SCMHeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -78,7 +78,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       SCMHeartbeatRequestProto request) throws ServiceException {
     try {
       return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
-          .getDatanodeID()), request.getNodeReport());
+          .getDatanodeID()), request.getNodeReport(),
+          request.getContainerReportState());
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -391,12 +393,14 @@ public class StorageContainerManager
    * Used by data node to send a Heartbeat.
    *
    * @param datanodeID - Datanode ID.
+   * @param nodeReport - Node Report
+   * @param reportState - Container report ready info.
    * @return - SCMHeartbeatResponseProto
    * @throws IOException
    */
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
-      SCMNodeReport nodeReport) throws IOException {
+      SCMNodeReport nodeReport, ReportState reportState) throws IOException {
     List<SCMCommand> commands =
         getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
     List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();

+ 15 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java

@@ -49,7 +49,7 @@ public class ContainerMapping implements Mapping {
   private final long cacheSize;
   private final Lock lock;
   private final Charset encoding = Charset.forName("UTF-8");
-  private final LevelDBStore store;
+  private final LevelDBStore containerStore;
   private final Random rand;
 
   /**
@@ -75,11 +75,14 @@ public class ContainerMapping implements Mapping {
       throw
           new IllegalArgumentException("SCM metadata directory is not valid.");
     }
-    File dbPath = new File(scmMetaDataDir, "SCM.db");
     Options options = new Options();
     options.cacheSize(this.cacheSize * (1024L * 1024L));
     options.createIfMissing();
-    store = new LevelDBStore(dbPath, options);
+
+    // Write the container name to pipeline mapping.
+    File containerDBPath = new File(scmMetaDataDir, "container.db");
+    containerStore = new LevelDBStore(containerDBPath, options);
+
     this.lock = new ReentrantLock();
     rand = new Random();
   }
@@ -103,6 +106,8 @@ public class ContainerMapping implements Mapping {
     return pipeline;
   }
 
+
+
   /**
    * Returns the Pipeline from the container name.
    *
@@ -114,7 +119,8 @@ public class ContainerMapping implements Mapping {
     Pipeline pipeline = null;
     lock.lock();
     try {
-      byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
+      byte[] pipelineBytes =
+          containerStore.get(containerName.getBytes(encoding));
       if (pipelineBytes == null) {
         throw new IOException("Specified key does not exist. key : " +
             containerName);
@@ -145,7 +151,8 @@ public class ContainerMapping implements Mapping {
 
     lock.lock();
     try {
-      byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
+      byte[] pipelineBytes =
+          containerStore.get(containerName.getBytes(encoding));
       if (pipelineBytes != null) {
         throw new IOException("Specified container already exists. key : " +
             containerName);
@@ -153,7 +160,7 @@ public class ContainerMapping implements Mapping {
       DatanodeID id = getDatanodeID();
       if (id != null) {
         pipeline = newPipelineFromNodes(id, containerName);
-        store.put(containerName.getBytes(encoding),
+        containerStore.put(containerName.getBytes(encoding),
             pipeline.getProtobufMessage().toByteArray());
       }
     } finally {
@@ -193,8 +200,8 @@ public class ContainerMapping implements Mapping {
    */
   @Override
   public void close() throws IOException {
-    if (store != null) {
-      store.close();
+    if (containerStore != null) {
+      containerStore.close();
     }
   }
 }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java

@@ -39,7 +39,8 @@ public class CommandQueue {
 
   private final Map<DatanodeID, List<SCMCommand>> commandMap;
   private final Lock lock;
-  private final List<SCMCommand> nullList;
+  // This map is used as default return value containing one null command.
+  private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
 
   /**
    * Constructs a Command Queue.
@@ -47,8 +48,7 @@ public class CommandQueue {
   public CommandQueue() {
     commandMap = new HashMap<>();
     lock = new ReentrantLock();
-    nullList = new LinkedList<>();
-    nullList.add(NullCommand.newBuilder().build());
+    DEFAULT_LIST.add(NullCommand.newBuilder().build());
   }
 
   /**
@@ -75,7 +75,7 @@ public class CommandQueue {
     } finally {
       lock.unlock();
     }
-    return nullList;
+    return DEFAULT_LIST;
   }
 
   /**

+ 89 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -47,15 +47,80 @@ import "DatanodeContainerProtocol.proto";
 */
 message SCMHeartbeatRequestProto {
   required DatanodeIDProto datanodeID = 1;
-  optional SCMNodeReport nodeReport= 2;
+  optional SCMNodeReport nodeReport = 2;
+  optional ReportState containerReportState = 3;
 }
 
+enum ContainerState {
+  closed = 0;
+  open = 1;
+}
+
+/**
+NodeState contains messages from datanode to SCM saying that it has
+some information that SCM might be interested in.*/
+message ReportState {
+  enum states {
+    noContainerReports = 0;
+    completeContinerReport = 1;
+    deltaContainerReport = 2;
+  }
+  required states state = 1;
+  required int64 count = 2 [default = 0];
+}
+
+
+/**
+This message is used to persist the information about a container in the
+SCM database, This information allows SCM to startup faster and avoid having
+all container info in memory all the time.
+  */
+message ContainerPersistanceProto {
+  required ContainerState state = 1;
+  required hadoop.hdfs.ozone.Pipeline pipeline = 2;
+  required ContainerInfo info = 3;
+}
+
+/**
+This message is used to do a quick look up of which containers are effected
+if a node goes down
+*/
+message NodeContianerMapping {
+  repeated string contianerName = 1;
+}
+
+
+
+/**
+A container report contains the following information.
+*/
+message ContainerInfo {
+  required string containerName = 1;
+  repeated bytes finalhash = 2;
+  optional int64 size = 3;
+  optional int64 keycount = 4;
+}
+
+/**
+A set of container reports, max count is generally set to
+8192 since that keeps the size of the reports under 1 MB.
+*/
+message ContainerReports {
+  enum reportType {
+    fullReport = 0;
+    deltaReport = 1;
+  }
+  repeated ContainerInfo reports = 1;
+  required reportType type = 2;
+}
+
+
 /**
 * This message is send along with the heart beat to report datanode
 * storage utilization by SCM.
 */
 message SCMNodeReport {
-  repeated SCMStorageReport storageReport= 1;
+  repeated SCMStorageReport storageReport = 1;
 }
 
 message SCMStorageReport {
@@ -123,6 +188,12 @@ message NullCmdResponseProto {
 
 }
 
+/**
+This command tells the data node to send in the container report when possible
+*/
+message SendContainerReportProto {
+}
+
 /**
 Type of commands supported by SCM to datanode protocol.
 */
@@ -130,6 +201,7 @@ enum Type {
   nullCmd = 1;
   versionCommand = 2;
   registeredCommand = 3;
+  sendContainerReport = 4;
 }
 
 /*
@@ -138,6 +210,10 @@ enum Type {
 message SCMCommandResponseProto {
   required Type cmdType = 2; // Type of the command
   optional NullCmdResponseProto nullCommand = 3;
+  optional SCMRegisteredCmdResponseProto registeredProto = 4;
+  optional SCMVersionResponseProto versionProto = 5;
+  optional SendContainerReportProto sendReport = 6;
+
 }
 
 
@@ -157,9 +233,9 @@ message SCMHeartbeatResponseProto {
  * Here is a simple state diagram that shows how a datanode would boot up and
  * communicate with SCM.
  *
- *          +-----------------------+
+ *           ----------------------- 
  *          |         Start         |
- *          +----------+------------+
+ *           ---------- ------------ 
  *                     |
  *                     |
  *                     |
@@ -167,19 +243,19 @@ message SCMHeartbeatResponseProto {
  *                     |
  *                     |
  *                     |
- *          +----------v-------------+
- *          |   Searching for  SCM   +------------+
- *          +----------+-------------+            |
+ *           ----------v------------- 
+ *          |   Searching for  SCM    ------------ 
+ *           ---------- -------------             |
  *                     |                          |
  *                     |                          |
- *                     |               +----------v-------------+
+ *                     |                ----------v------------- 
  *                     |               | Register if needed     |
- *                     |               +-----------+------------+
+ *                     |                ----------- ------------ 
  *                     |                           |
  *                     v                           |
- *           +-----------+----------------+        |
- * +---------+  Heartbeat state           <--------+
- * |         +--------^-------------------+
+ *            ----------- ----------------         |
+ *  ---------   Heartbeat state           <-------- 
+ * |          --------^------------------- 
  * |                  |
  * |                  |
  * |                  |
@@ -187,7 +263,7 @@ message SCMHeartbeatResponseProto {
  * |                  |
  * |                  |
  * |                  |
- * +------------------+
+ *  ------------------ 
  *
  *
  *

+ 11 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java

@@ -20,10 +20,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.NullCommand;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.scm.VersionInfo;
 
 import java.io.IOException;
@@ -37,6 +36,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   private int rpcResponseDelay;
   private AtomicInteger heartbeatCount = new AtomicInteger(0);
   private AtomicInteger rpcCount = new AtomicInteger(0);
+  private ReportState reportState;
 
   /**
    * Returns the number of heartbeats made to this class.
@@ -112,10 +112,11 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
    */
   @Override
   public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
-      sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport)
-      throws IOException {
+      sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport,
+      ReportState reportState) throws IOException {
     rpcCount.incrementAndGet();
     heartbeatCount.incrementAndGet();
+    this.reportState = reportState;
     sleepIfNeeded();
     StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
         cmdResponse = StorageContainerDatanodeProtocolProtos
@@ -153,4 +154,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
             StorageContainerDatanodeProtocolProtos
                 .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
   }
+
+  public ReportState getReportState() {
+    return this.reportState;
+  }
 }

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManage
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.After;
@@ -48,6 +49,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
 
 /**
  * Tests the datanode state machine class and its states.
@@ -65,6 +68,7 @@ public class TestDatanodeStateMachine {
   @Before
   public void setUp() throws Exception {
     conf = SCMTestUtils.getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, 500);
     serverAddresses = new LinkedList<>();
     scmServers = new LinkedList<>();
     mockServers = new LinkedList<>();
@@ -194,9 +198,9 @@ public class TestDatanodeStateMachine {
 
     // This execute will invoke getVersion calls against all SCM endpoints
     // that we know of.
-    task.execute(executorService);
-    newState = task.await(2, TimeUnit.SECONDS);
 
+    task.execute(executorService);
+    newState = task.await(10, TimeUnit.SECONDS);
     // If we are in running state, we should be in running.
     Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
         newState);
@@ -246,8 +250,14 @@ public class TestDatanodeStateMachine {
     Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
         newState);
 
+
     for (ScmTestMock mock : mockServers) {
       Assert.assertEquals(1, mock.getHeartbeatCount());
+      // Assert that heartbeat did indeed carry that State that we said
+      // have in the datanode.
+      Assert.assertEquals(mock.getReportState().getState().getNumber(),
+          StorageContainerDatanodeProtocolProtos.ReportState.states
+              .noContainerReports.getNumber());
     }
   }
 

+ 31 - 21
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -20,7 +20,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -30,16 +31,18 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
     .RegisterEndpointTask;
 import org.apache.hadoop.ozone.container.common.states.endpoint
     .VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -58,6 +61,9 @@ import java.net.InetSocketAddress;
 import java.util.UUID;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState.states
+    .noContainerReports;
 
 /**
  * Tests the endpoints.
@@ -67,6 +73,27 @@ public class TestEndPoint {
   private static RPC.Server scmServer;
   private static ScmTestMock scmServerImpl;
   private static File testDir;
+  private static StorageContainerDatanodeProtocolProtos.ReportState
+      defaultReportState;
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (scmServer != null) {
+      scmServer.stop();
+    }
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    serverAddress = SCMTestUtils.getReuseableAddress();
+    scmServerImpl = new ScmTestMock();
+    scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
+        scmServerImpl, serverAddress, 10);
+    testDir = PathUtils.getTestDir(TestEndPoint.class);
+    defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState.
+        newBuilder().setState(noContainerReports).build();
+  }
 
   @Test
   /**
@@ -255,7 +282,7 @@ public class TestEndPoint {
       srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
       nrb.addStorageReport(srb);
       SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
-          .sendHeartbeat(dataNode, nrb.build());
+          .sendHeartbeat(dataNode, nrb.build(), defaultReportState);
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(1, responseProto.getCommandsCount());
       Assert.assertNotNull(responseProto.getCommandsList().get(0));
@@ -322,21 +349,4 @@ public class TestEndPoint {
     scmServerImpl.setRpcResponseDelay(0);
     Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
   }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    if (scmServer != null) {
-      scmServer.stop();
-    }
-    FileUtil.fullyDelete(testDir);
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    serverAddress = SCMTestUtils.getReuseableAddress();
-    scmServerImpl = new ScmTestMock();
-    scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
-        scmServerImpl, serverAddress, 10);
-    testDir = PathUtils.getTestDir(TestEndPoint.class);
-  }
 }