Browse Source

HDFS-11081. Ozone:SCM: Add support for registerNode in datanode. Contributed by Anu Engineer.

Anu Engineer 8 năm trước cách đây
mục cha
commit
8bd85268e6
33 tập tin đã thay đổi với 3383 bổ sung182 xóa
  1. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  2. 114 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java
  3. 50 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  4. 213 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  5. 265 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
  6. 174 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
  7. 191 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  8. 28 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
  9. 55 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
  10. 135 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
  11. 297 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
  12. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
  13. 181 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  14. 198 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
  15. 66 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
  16. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
  17. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
  18. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  19. 58 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java
  21. 16 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
  23. 154 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
  24. 32 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java
  25. 86 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
  26. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java
  27. 15 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
  28. 47 23
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
  29. 188 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
  30. 149 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
  31. 274 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
  32. 314 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
  33. 17 68
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -913,7 +913,7 @@ public class DataNode extends ReconfigurableBase
    * @throws UnknownHostException if the dfs.datanode.dns.interface
    *    option is used and the hostname can not be determined
    */
-  private static String getHostName(Configuration config)
+  public static String getHostName(Configuration config)
       throws UnknownHostException {
     String name = config.get(DFS_DATANODE_HOST_NAME_KEY);
     if (name == null) {

+ 114 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java

@@ -1,43 +1,61 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 package org.apache.hadoop.ozone;
 
 import com.google.common.base.Optional;
 
+import com.google.common.net.HostAndPort;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL_MS;
 
 /**
  * Utility methods for Ozone and Container Clients.
@@ -51,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERV
 public final class OzoneClientUtils {
   private static final Logger LOG = LoggerFactory.getLogger(
       OzoneClientUtils.class);
+  private static final int NO_PORT = -1;
 
   /**
    * The service ID of the solitary Ozone SCM service.
@@ -139,7 +158,7 @@ public final class OzoneClientUtils {
 
     return NetUtils.createSocketAddr(
         host.or(OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
-        port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
+            port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
   }
 
   /**
@@ -160,7 +179,7 @@ public final class OzoneClientUtils {
 
     return NetUtils.createSocketAddr(
         host.or(OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
-        port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
+            port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
   }
 
   /**
@@ -168,7 +187,7 @@ public final class OzoneClientUtils {
    * Each config value may be absent, or if present in the format
    * host:port (the :port part is optional).
    *
-   * @param conf
+   * @param conf  - Conf
    * @param keys a list of configuration key names.
    *
    * @return first hostname component found from the given keys, or absent.
@@ -176,51 +195,65 @@ public final class OzoneClientUtils {
    *             or host:port format.
    */
   static Optional<String> getHostNameFromConfigKeys(
-      Configuration conf, String ... keys) {
+      Configuration conf, String... keys) {
     for (final String key : keys) {
       final String value = conf.getTrimmed(key);
-      if (value != null && !value.isEmpty()) {
-        String[] splits = value.split(":");
-
-        if(splits.length < 1 || splits.length > 2) {
-          throw new IllegalArgumentException(
-              "Invalid value " + value + " for config key " + key +
-                  ". It should be in 'host' or 'host:port' format");
-        }
-        return Optional.of(splits[0]);
+      final Optional<String> hostName = getHostName(value);
+      if (hostName.isPresent()) {
+        return hostName;
       }
     }
     return Optional.absent();
   }
 
+  /**
+   * Gets the hostname or Indicates that it is absent.
+   * @param value host or host:port
+   * @return hostname
+   */
+  public static Optional<String> getHostName(String value) {
+    if ((value == null) || value.isEmpty()) {
+      return Optional.absent();
+    }
+    return Optional.of(HostAndPort.fromString(value).getHostText());
+  }
+
+  /**
+   * Gets the port if there is one, throws otherwise.
+   * @param value  String in host:port format.
+   * @return Port
+   */
+  public static Optional<Integer> getHostPort(String value) {
+    if((value == null) || value.isEmpty()) {
+      return Optional.absent();
+    }
+    int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
+    if (port == NO_PORT) {
+      return Optional.absent();
+    } else {
+      return Optional.of(port);
+    }
+  }
+
   /**
    * Retrieve the port number, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
    * host:port (the :port part is optional).
    *
-   * @param conf
+   * @param conf Conf
    * @param keys a list of configuration key names.
    *
    * @return first port number component found from the given keys, or absent.
    * @throws IllegalArgumentException if any values are not in the 'host'
    *             or host:port format.
    */
-  static Optional<Integer> getPortNumberFromConfigKeys(
-      Configuration conf, String ... keys) {
+  public static Optional<Integer> getPortNumberFromConfigKeys(
+      Configuration conf, String... keys) {
     for (final String key : keys) {
       final String value = conf.getTrimmed(key);
-      if (value != null && !value.isEmpty()) {
-        String[] splits = value.split(":");
-
-        if(splits.length < 1 || splits.length > 2) {
-          throw new IllegalArgumentException(
-              "Invalid value " + value + " for config key " + key +
-                  ". It should be in 'host' or 'host:port' format");
-        }
-
-        if (splits.length == 2) {
-          return Optional.of(Integer.parseInt(splits[1]));
-        }
+      final Optional<Integer> hostPort = getHostPort(value);
+      if (hostPort.isPresent()) {
+        return hostPort;
       }
     }
     return Optional.absent();
@@ -259,7 +292,7 @@ public final class OzoneClientUtils {
    * @return long
    */
   private static long sanitizeUserArgs(long valueTocheck, long baseValue,
-                                       long minFactor, long maxFactor)
+      long minFactor, long maxFactor)
       throws IllegalArgumentException {
     if ((valueTocheck >= (baseValue * minFactor)) &&
         (valueTocheck <= (baseValue * maxFactor))) {
@@ -270,7 +303,6 @@ public final class OzoneClientUtils {
     throw new IllegalArgumentException(errMsg);
   }
 
-
   /**
    * Returns the interval in which the heartbeat processor thread runs.
    *
@@ -282,7 +314,6 @@ public final class OzoneClientUtils {
         OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT);
   }
 
-
   /**
    * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
    * SCM.
@@ -295,7 +326,6 @@ public final class OzoneClientUtils {
         OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT);
   }
 
-
   /**
    * Get the Stale Node interval, which is used by SCM to flag a datanode as
    * stale, if the heartbeat from that node has been missing for this duration.
@@ -340,7 +370,6 @@ public final class OzoneClientUtils {
     return staleNodeIntevalMs;
   }
 
-
   /**
    * Gets the interval for dead node flagging. This has to be a value that is
    * greater than stale node value,  and by transitive relation we also know
@@ -374,8 +403,42 @@ public final class OzoneClientUtils {
    * @param conf Configration
    * @return - int -- Number of HBs to process
    */
-  public static int getMaxHBToProcessPerLoop(Configuration conf){
+  public static int getMaxHBToProcessPerLoop(Configuration conf) {
     return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
         OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
   }
+
+  /**
+   * Timeout value for the RPC from Datanode to SCM, primarily used for
+   * Heartbeats and container reports.
+   *
+   * @param conf - Ozone Config
+   * @return - Rpc timeout in Milliseconds.
+   */
+  public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
+    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
+        OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Log Warn interval.
+   *
+   * @param conf - Ozone Config
+   * @return - Log warn interval.
+   */
+  public static int getLogWarnInterval(Configuration conf) {
+    return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
+        OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
+  }
+
+  /**
+   * returns the Container port.
+   * @param conf - Conf
+   * @return port number.
+   */
+  public static int getContainerPort(Configuration conf) {
+    return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys
+        .DFS_CONTAINER_IPC_PORT_DEFAULT);
+  }
+
 }

+ 50 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -43,8 +43,8 @@ public final class OzoneConfigKeys {
       "ozone.trace.enabled";
   public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false;
 
-  public static final String OZONE_METADATA_DIRS =
-      "ozone.metadata.dirs";
+  public static final String OZONE_CONTAINER_METADATA_DIRS =
+      "ozone.container.metadata.dirs";
 
   public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
   public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
@@ -94,6 +94,54 @@ public final class OzoneConfigKeys {
   public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
       OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L;
 
+  public static final String OZONE_SCM_CONTAINER_THREADS =
+      "ozone.scm.container.threads";
+  public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT =
+      Runtime.getRuntime().availableProcessors() * 2;
+
+  public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
+      "ozone.scm.heartbeat.rpc-timeout";
+  public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
+      100;
+
+  /**
+   * Defines how frequently we will log the missing of heartbeat to a specific
+   * SCM. In the default case we will write a warning message for each 10
+   * sequential heart beats that we miss to a specific SCM. This is to avoid
+   * overrunning the log with lots of HB missed Log statements.
+   */
+  public static final String OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT =
+      "ozone.scm.heartbeat.log.warn.interval.count";
+  public static final int OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT =
+      10;
+
+  public static final String OZONE_CONTAINER_TASK_WAIT =
+      "ozone.container.task.wait.seconds";
+  public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;
+
+
+  // ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT.
+  // Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777
+  //
+  // If this key is not specified datanodes will not be able to find
+  // SCM. The SCM membership can be dynamic, so this key should contain
+  // all possible SCM names. Once the SCM leader is discovered datanodes will
+  // get the right list of SCMs to heartbeat to from the leader.
+  // While it is good for the datanodes to know the names of all SCM nodes,
+  // it is sufficient to actually know the name of on working SCM. That SCM
+  // will be able to return the information about other SCMs that are part of
+  // the SCM replicated Log.
+  //
+  //In case of a membership change, any one of the SCM machines will be
+  // able to send back a new list to the datanodes.
+  public static final String OZONE_SCM_NAMES = "ozone.scm.names";
+
+  public static final int OZONE_SCM_DEFAULT_PORT = 9862;
+  // File Name and path where datanode ID is to written to.
+  // if this value is not set then container startup will fail.
+  public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id";
+
+
 
   /**
    * There is no need to instantiate this class.

+ 213 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * State Machine Class.
+ */
+public class DatanodeStateMachine implements Closeable {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeStateMachine.class);
+  private final ExecutorService executorService;
+  private final Configuration conf;
+  private final SCMConnectionManager connectionManager;
+  private final long taskWaitTime;
+  private final long heartbeatFrequency;
+  private StateContext context;
+
+  /**
+   * Constructs a container state machine.
+   *
+   * @param conf - Configration.
+   */
+  public DatanodeStateMachine(Configuration conf) {
+    this.conf = conf;
+    executorService = HadoopExecutors.newScheduledThreadPool(
+        this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
+            OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Container State Machine Thread - %d").build());
+    connectionManager = new SCMConnectionManager(conf);
+    context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
+    taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT,
+        OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT);
+    heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
+  }
+
+  /**
+   * Returns the Connection manager for this state machine.
+   *
+   * @return - SCMConnectionManager.
+   */
+  public SCMConnectionManager getConnectionManager() {
+    return connectionManager;
+  }
+
+  /**
+   * Runs the state machine at a fixed frequency.
+   */
+  public void start() throws IOException {
+    long now = 0;
+    long nextHB = 0;
+    while (context.getState() != DatanodeStates.SHUTDOWN) {
+      try {
+        nextHB = Time.monotonicNow() + heartbeatFrequency;
+        context.execute(executorService, taskWaitTime, TimeUnit.SECONDS);
+        now = Time.monotonicNow();
+        if (now < nextHB) {
+          Thread.sleep(nextHB - now);
+        }
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        LOG.error("Unable to finish the execution", e);
+      }
+    }
+  }
+
+  /**
+   * Gets the current context.
+   *
+   * @return StateContext
+   */
+  public StateContext getContext() {
+    return context;
+  }
+
+  /**
+   * Sets the current context.
+   *
+   * @param context - Context
+   */
+  public void setContext(StateContext context) {
+    this.context = context;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it. If
+   * the stream is already closed then invoking this method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful attention. It is strongly advised to relinquish the
+   * underlying resources and to internally <em>mark</em> the {@code Closeable}
+   * as closed, prior to throwing the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown statemachine properly.");
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Error attempting to shutdown.", e);
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+
+    for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+      endPoint.close();
+    }
+  }
+
+  /**
+   * States that a datanode  can be in. GetNextState will move this enum from
+   * getInitState to getLastState.
+   */
+  public enum DatanodeStates {
+    INIT(1),
+    RUNNING(2),
+    SHUTDOWN(3);
+    private final int value;
+
+    /**
+     * Constructs ContainerStates.
+     *
+     * @param value
+     */
+    DatanodeStates(int value) {
+      this.value = value;
+    }
+
+    /**
+     * Returns the first State.
+     *
+     * @return First State.
+     */
+    public static DatanodeStates getInitState() {
+      return INIT;
+    }
+
+    /**
+     * The last state of endpoint states.
+     *
+     * @return last state.
+     */
+    public static DatanodeStates getLastState() {
+      return SHUTDOWN;
+    }
+
+    /**
+     * returns the numeric value associated with the endPoint.
+     *
+     * @return int.
+     */
+    public int getValue() {
+      return value;
+    }
+
+    /**
+     * Returns the next logical state that endPoint should move to. This
+     * function assumes the States are sequentially numbered.
+     *
+     * @return NextState.
+     */
+    public DatanodeStates getNextState() {
+      if (this.value < getLastState().getValue()) {
+        int stateValue = this.getValue() + 1;
+        for (DatanodeStates iter : values()) {
+          if (stateValue == iter.getValue()) {
+            return iter;
+          }
+        }
+      }
+      return getLastState();
+    }
+  }
+}

+ 265 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Endpoint is used as holder class that keeps state around the RPC endpoint.
+ */
+public class EndpointStateMachine implements Closeable {
+  static final Logger
+      LOG = LoggerFactory.getLogger(EndpointStateMachine.class);
+  private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint;
+  private final AtomicLong missedCount;
+  private final InetSocketAddress address;
+  private final Lock lock;
+  private final Configuration conf;
+  private EndPointStates state;
+  private VersionResponse version;
+
+  /**
+   * Constructs RPC Endpoints.
+   *
+   * @param endPoint - RPC endPoint.
+   */
+  public EndpointStateMachine(InetSocketAddress address,
+      StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint,
+      Configuration conf) {
+    this.endPoint = endPoint;
+    this.missedCount = new AtomicLong(0);
+    this.address = address;
+    state = EndPointStates.getInitState();
+    lock = new ReentrantLock();
+    this.conf = conf;
+  }
+
+  /**
+   * Takes a lock on this EndPoint so that other threads don't use this while we
+   * are trying to communicate via this endpoint.
+   */
+  public void lock() {
+    lock.lock();
+  }
+
+  /**
+   * Unlocks this endpoint.
+   */
+  public void unlock() {
+    lock.unlock();
+  }
+
+  /**
+   * Returns the version that we read from the server if anyone asks .
+   *
+   * @return - Version Response.
+   */
+  public VersionResponse getVersion() {
+    return version;
+  }
+
+  /**
+   * Sets the Version reponse we recieved from the SCM.
+   *
+   * @param version VersionResponse
+   */
+  public void setVersion(VersionResponse version) {
+    this.version = version;
+  }
+
+  /**
+   * Returns the current State this end point is in.
+   *
+   * @return - getState.
+   */
+  public EndPointStates getState() {
+    return state;
+  }
+
+  /**
+   * Sets the endpoint state.
+   *
+   * @param state - state.
+   */
+  public EndPointStates setState(EndPointStates state) {
+    this.state = state;
+    return this.state;
+  }
+
+  /**
+   * Closes the connection.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (endPoint != null) {
+      endPoint.close();
+    }
+  }
+
+  /**
+   * We maintain a count of how many times we missed communicating with a
+   * specific SCM. This is not made atomic since the access to this is always
+   * guarded by the read or write lock. That is, it is serialized.
+   */
+  public void incMissed() {
+    this.missedCount.incrementAndGet();
+  }
+
+  /**
+   * Returns the value of the missed count.
+   *
+   * @return int
+   */
+  public long getMissedCount() {
+    return this.missedCount.get();
+  }
+
+  public void zeroMissedCount() {
+    this.missedCount.set(0);
+  }
+
+  /**
+   * Returns the InetAddress of the endPoint.
+   *
+   * @return - EndPoint.
+   */
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  /**
+   * Returns real RPC endPoint.
+   *
+   * @return rpc client.
+   */
+  public StorageContainerDatanodeProtocolClientSideTranslatorPB
+      getEndPoint() {
+    return endPoint;
+  }
+
+  /**
+   * Returns the string that represents this endpoint.
+   *
+   * @return - String
+   */
+  public String toString() {
+    return address.toString();
+  }
+
+  /**
+   * Logs exception if needed.
+   *  @param ex         - Exception
+   */
+  public void logIfNeeded(Exception ex) {
+    LOG.trace("Incrementing the Missed count. Ex : {}", ex);
+    this.incMissed();
+    if (this.getMissedCount() % OzoneClientUtils.getLogWarnInterval(conf) ==
+        0) {
+      LOG.warn("Unable to communicate to SCM server at {}. We have not been " +
+              "able to communicate to this SCM server for past {} seconds.",
+          this.getAddress().getHostString() + ":" + this.getAddress().getPort(),
+          this.getMissedCount() * OzoneClientUtils.getScmHeartbeatInterval(
+              this.conf));
+    }
+  }
+
+
+  /**
+   * States that an Endpoint can be in.
+   * <p>
+   * This is a sorted list of states that EndPoint will traverse.
+   * <p>
+   * GetNextState will move this enum from getInitState to getLastState.
+   */
+  public enum EndPointStates {
+    GETVERSION(1),
+    REGISTER(2),
+    HEARTBEAT(3),
+    SHUTDOWN(4); // if you add value after this please edit getLastState too.
+    private final int value;
+
+    /**
+     * Constructs endPointStates.
+     *
+     * @param value  state.
+     */
+    EndPointStates(int value) {
+      this.value = value;
+    }
+
+    /**
+     * Returns the first State.
+     *
+     * @return First State.
+     */
+    public static EndPointStates getInitState() {
+      return GETVERSION;
+    }
+
+    /**
+     * The last state of endpoint states.
+     *
+     * @return last state.
+     */
+    public static EndPointStates getLastState() {
+      return SHUTDOWN;
+    }
+
+    /**
+     * returns the numeric value associated with the endPoint.
+     *
+     * @return int.
+     */
+    public int getValue() {
+      return value;
+    }
+
+    /**
+     * Returns the next logical state that endPoint should move to.
+     * The next state is computed by adding 1 to the current state.
+     *
+     * @return NextState.
+     */
+    public EndPointStates getNextState() {
+      if (this.getValue() < getLastState().getValue()) {
+        int stateValue = this.getValue() + 1;
+        for (EndPointStates iter : values()) {
+          if (stateValue == iter.getValue()) {
+            return iter;
+          }
+        }
+      }
+      return getLastState();
+    }
+  }
+}

+ 174 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * SCMConnectionManager - Acts as a class that manages the membership
+ * information of the SCMs that we are working with.
+ */
+public  class SCMConnectionManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMConnectionManager.class);
+
+  private final ReadWriteLock mapLock;
+  private final Map<InetSocketAddress, EndpointStateMachine> scmMachines;
+
+  private final int rpcTimeout;
+  private final Configuration conf;
+
+
+  public SCMConnectionManager(Configuration conf) {
+    this.mapLock = new ReentrantReadWriteLock();
+    Long timeOut = OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf);
+    this.rpcTimeout = timeOut.intValue();
+    this.scmMachines = new HashMap<>();
+    this.conf = conf;
+  }
+
+  /**
+   * Returns Config.
+   *
+   * @return ozoneConfig.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Get RpcTimeout.
+   *
+   * @return - Return RPC timeout.
+   */
+  public long getRpcTimeout() {
+    return rpcTimeout;
+  }
+
+
+  /**
+   * Takes a read lock.
+   */
+  public void readLock() {
+    this.mapLock.readLock().lock();
+  }
+
+  /**
+   * Releases the read lock.
+   */
+  public void readUnlock() {
+    this.mapLock.readLock().unlock();
+  }
+
+  /**
+   * Takes the write lock.
+   */
+  public void writeLock() {
+    this.mapLock.writeLock().lock();
+  }
+
+  /**
+   * Releases the write lock.
+   */
+  public void writeUnlock() {
+    this.mapLock.writeLock().unlock();
+  }
+
+  /**
+   * adds a new SCM machine to the target set.
+   *
+   * @param address - Address of the SCM machine to send heatbeat to.
+   * @throws IOException
+   */
+  public void addSCMServer(InetSocketAddress address) throws IOException {
+    writeLock();
+    try {
+      if (scmMachines.containsKey(address)) {
+        LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
+            "Ignoring the request.");
+        return;
+      }
+      RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+          ProtobufRpcEngine.class);
+      long version =
+          RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+      StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
+          StorageContainerDatanodeProtocolPB.class, version,
+          address, UserGroupInformation.getCurrentUser(), conf,
+          NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+
+      StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+          new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+      EndpointStateMachine endPoint =
+          new EndpointStateMachine(address, rpcClient, conf);
+      scmMachines.put(address, endPoint);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Removes a  SCM machine for the target set.
+   *
+   * @param address - Address of the SCM machine to send heatbeat to.
+   * @throws IOException
+   */
+  public void removeSCMServer(InetSocketAddress address) throws IOException {
+    writeLock();
+    try {
+      if (!scmMachines.containsKey(address)) {
+        LOG.warn("Trying to remove a non-existent SCM machine. " +
+            "Ignoring the request.");
+        return;
+      }
+
+      EndpointStateMachine endPoint = scmMachines.get(address);
+      endPoint.close();
+      scmMachines.remove(address);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Returns all known RPCEndpoints.
+   *
+   * @return - List of RPC Endpoints.
+   */
+  public Collection<EndpointStateMachine> getValues() {
+    return scmMachines.values();
+  }
+}

+ 191 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+    .RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Current Context of State Machine.
+ */
+public class StateContext {
+  private final Queue<SCMCommand> commandQueue;
+  private final Lock lock;
+  private final DatanodeStateMachine parent;
+  private final AtomicLong stateExecutionCount;
+  private final Configuration conf;
+  private DatanodeStateMachine.DatanodeStates state;
+
+  /**
+   * Constructs a StateContext.
+   *
+   * @param conf   - Configration
+   * @param state  - State
+   * @param parent Parent State Machine
+   */
+  public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
+      state, DatanodeStateMachine parent) {
+    this.conf = conf;
+    this.state = state;
+    this.parent = parent;
+    commandQueue = new LinkedList<>();
+    lock = new ReentrantLock();
+    stateExecutionCount = new AtomicLong(0);
+  }
+
+  /**
+   * Returns the ContainerStateMachine class that holds this state.
+   *
+   * @return ContainerStateMachine.
+   */
+  public DatanodeStateMachine getParent() {
+    return parent;
+  }
+
+  /**
+   * Returns true if we are entering a new state.
+   *
+   * @return boolean
+   */
+  boolean isEntering() {
+    return stateExecutionCount.get() == 0;
+  }
+
+  /**
+   * Returns true if we are exiting from the current state.
+   *
+   * @param newState - newState.
+   * @return boolean
+   */
+  boolean isExiting(DatanodeStateMachine.DatanodeStates newState) {
+    boolean isExiting = state != newState && stateExecutionCount.get() > 0;
+    if(isExiting) {
+      stateExecutionCount.set(0);
+    }
+    return isExiting;
+  }
+
+  /**
+   * Returns the current state the machine is in.
+   *
+   * @return state.
+   */
+  public DatanodeStateMachine.DatanodeStates getState() {
+    return state;
+  }
+
+  /**
+   * Sets the current state of the machine.
+   *
+   * @param state state.
+   */
+  public void setState(DatanodeStateMachine.DatanodeStates state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns the next task to get executed by the datanode state machine.
+   * @return A callable that will be executed by the
+   * {@link DatanodeStateMachine}
+   */
+  @SuppressWarnings("unchecked")
+  public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() {
+    switch (this.state) {
+    case INIT:
+      return new InitDatanodeState(this.conf, parent.getConnectionManager(),
+          this);
+    case RUNNING:
+      return new RunningDatanodeState(this.conf, parent.getConnectionManager(),
+          this);
+    case SHUTDOWN:
+      return null;
+    default:
+      throw new IllegalArgumentException("Not Implemented yet.");
+    }
+  }
+
+  /**
+   * Executes the required state function.
+   *
+   * @param service - Executor Service
+   * @param time    - seconds to wait
+   * @param unit    - Seconds.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  public void execute(ExecutorService service, long time, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    stateExecutionCount.incrementAndGet();
+    DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask();
+    if (this.isEntering()) {
+      task.onEnter();
+    }
+    task.execute(service);
+    DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
+    if (this.state != newState) {
+      if (isExiting(newState)) {
+        task.onExit();
+      }
+      this.setState(newState);
+    }
+  }
+
+  /**
+   * Returns the next command or null if it is empty.
+   *
+   * @return SCMCommand or Null.
+   */
+  public SCMCommand getNextCommand() {
+    lock.lock();
+    try {
+      return commandQueue.poll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Adds a command to the State Machine queue.
+   *
+   * @param command - SCMCommand.
+   */
+  public void addCommand(SCMCommand command) {
+    lock.lock();
+    try {
+      commandQueue.add(command);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+
+}

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+/**
+
+ State machine class is used by the container to denote various states a
+ container can be in and also is used for command processing.
+
+ Container has the following states.
+
+ Start - > getVersion -> Register -> Running  -> Shutdown
+
+ */

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java

@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * State Interface that allows tasks to maintain states.
+ */
+public interface DatanodeState<T> {
+  /**
+   * Called before entering this state.
+   */
+  void onEnter();
+
+  /**
+   * Called After exiting this state.
+   */
+  void onExit();
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  void execute(ExecutorService executor);
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param time - Time
+   * @param timeUnit - Unit of time.
+   */
+  T await(long time, TimeUnit timeUnit)
+      throws InterruptedException, ExecutionException, TimeoutException;
+
+}

+ 135 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Init Datanode State is the task that gets run when we are in Init State.
+ */
+public class InitDatanodeState implements DatanodeState,
+    Callable<DatanodeStateMachine.DatanodeStates> {
+  static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class);
+  private final SCMConnectionManager connectionManager;
+  private final Configuration conf;
+  private final StateContext context;
+  private Future<DatanodeStateMachine.DatanodeStates> result;
+
+  /**
+   *  Create InitDatanodeState Task.
+   *
+   * @param conf - Conf
+   * @param connectionManager - Connection Manager
+   * @param context - Current Context
+   */
+  public InitDatanodeState(Configuration conf,
+                           SCMConnectionManager connectionManager,
+                           StateContext context) {
+    this.conf = conf;
+    this.connectionManager = connectionManager;
+    this.context = context;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates call() throws Exception {
+    String[] addresses = conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES);
+    final Optional<Integer> defaultPort =  Optional.of(OzoneConfigKeys
+        .OZONE_SCM_DEFAULT_PORT);
+
+    if (addresses == null || addresses.length <= 0) {
+      LOG.error("SCM addresses need to be a set of valid DNS names " +
+          "or IP addresses. Null or empty address list found. Aborting " +
+          "containers.");
+      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+    }
+    for (String address : addresses) {
+      Optional<String> hostname = OzoneClientUtils.getHostName(address);
+      if (!hostname.isPresent()) {
+        LOG.error("Invalid hostname for SCM.");
+        return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+      }
+      Optional<Integer> port = OzoneClientUtils.getHostPort(address);
+      InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
+          port.or(defaultPort.get()));
+      connectionManager.addSCMServer(addr);
+    }
+    return this.context.getState().getNextState();
+  }
+
+  /**
+   * Called before entering this state.
+   */
+  @Override
+  public void onEnter() {
+    LOG.trace("Entering init container state");
+  }
+
+  /**
+   * Called After exiting this state.
+   */
+  @Override
+  public void onExit() {
+    LOG.trace("Exiting init container state");
+  }
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  @Override
+  public void execute(ExecutorService executor) {
+    result = executor.submit(this);
+  }
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param time     - Time
+   * @param timeUnit - Unit of time.
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates await(long time,
+      TimeUnit timeUnit) throws InterruptedException,
+      ExecutionException, TimeoutException {
+    return result.get(time, timeUnit);
+  }
+}

+ 297 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Class that implements handshake with SCM.
+ */
+public class RunningDatanodeState implements DatanodeState {
+  static final Logger
+      LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
+  private final SCMConnectionManager connectionManager;
+  private final Configuration conf;
+  private final StateContext context;
+  private CompletionService<EndpointStateMachine.EndPointStates> ecs;
+
+  public RunningDatanodeState(Configuration conf,
+      SCMConnectionManager connectionManager,
+      StateContext context) {
+    this.connectionManager = connectionManager;
+    this.conf = conf;
+    this.context = context;
+  }
+
+  /**
+   * Reads a datanode ID from the persisted information.
+   *
+   * @param idPath - Path to the ID File.
+   * @return DatanodeID
+   * @throws IOException
+   */
+  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+      readPersistedDatanodeID(Path idPath) throws IOException {
+    Preconditions.checkNotNull(idPath);
+    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+        containerIDProto;
+    try (FileInputStream stream = new FileInputStream(idPath.toFile())) {
+      containerIDProto = StorageContainerDatanodeProtocolProtos
+          .ContainerNodeIDProto.parseFrom(stream);
+      return containerIDProto;
+    }
+  }
+
+  /**
+   * Create a DatanodeID from the datanode information.
+   *
+   * @return DatanodeID
+   * @throws UnknownHostException
+   */
+  private DatanodeID createDatanodeID() throws UnknownHostException {
+    DatanodeID temp = new DatanodeID(
+        //TODO : Replace this with proper network and kerberos
+        // support code.
+        InetAddress.getLocalHost().getHostAddress().toString(),
+        DataNode.getHostName(conf),
+        UUID.randomUUID().toString(),
+        0, /** XferPort - SCM does not use this port  */
+        0, /** Info port - SCM does not use this port */
+        0, /** Info Secure Port - SCM does not use this port */
+        0); /** IPC port - SCM does not use this port */
+
+    // TODO: make this dynamically discoverable. SCM can hand out this
+    // port number to calling applications. This makes it easy to run multiple
+    // container endpoints on the same machine.
+    temp.setContainerPort(OzoneClientUtils.getContainerPort(conf));
+    return temp;
+  }
+
+  /**
+   * Creates a new ContainerID that persists both DatanodeID and ClusterID.
+   *
+   * @param idPath Path to the id file.
+   * @return ContainerNodeIDProto
+   * @throws UnknownHostException
+   */
+  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+      createNewContainerID(Path idPath)
+      throws IOException {
+    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+        containerIDProto = StorageContainerDatanodeProtocolProtos
+        .ContainerNodeIDProto.newBuilder()
+        .setDatanodeID(createDatanodeID().getProtoBufMessage()).build();
+    try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) {
+      stream.write(containerIDProto.toByteArray());
+      return containerIDProto;
+    }
+  }
+
+  /**
+   * Returns ContainerNodeIDProto or null in case of Error.
+   *
+   * @return ContainerNodeIDProto
+   */
+  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+      getContainerNodeID() {
+    String dataNodeIDPath = conf.get(OzoneConfigKeys.OZONE_SCM_DATANODE_ID);
+    if (dataNodeIDPath == null || dataNodeIDPath.isEmpty()) {
+      LOG.error("A valid file path is needed for config setting {}",
+          OzoneConfigKeys.OZONE_SCM_DATANODE_ID);
+
+      // This is an unrecoverable error.
+      this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
+      return null;
+    }
+    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto nodeID;
+    // try to read an existing ContainerNode ID.
+    try {
+      nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath));
+      if (nodeID != null) {
+        LOG.trace("Read Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
+        return nodeID;
+      }
+    } catch (IOException ex) {
+      LOG.trace("Not able to find container Node ID, creating it.", ex);
+    }
+    // Not found, let us create a new datanode ID, persist it and return that
+    // info to SCM.
+    try {
+      nodeID = createNewContainerID(Paths.get(dataNodeIDPath));
+      LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
+      return nodeID;
+    } catch (IOException ex) {
+      LOG.error("Creating new node ID failed.", ex);
+    }
+    return null;
+  }
+
+  /**
+   * Called before entering this state.
+   */
+  @Override
+  public void onEnter() {
+    LOG.trace("Entering handshake task.");
+  }
+
+  /**
+   * Called After exiting this state.
+   */
+  @Override
+  public void onExit() {
+    LOG.trace("Exiting handshake task.");
+  }
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  @Override
+  public void execute(ExecutorService executor) {
+    ecs = new ExecutorCompletionService<>(executor);
+    for (EndpointStateMachine endpoint : connectionManager.getValues()) {
+      Callable<EndpointStateMachine.EndPointStates> endpointTask
+          = getEndPointTask(endpoint);
+      ecs.submit(endpointTask);
+    }
+  }
+
+  private Callable<EndpointStateMachine.EndPointStates>
+      getEndPointTask(EndpointStateMachine endpoint) {
+    switch (endpoint.getState()) {
+    case GETVERSION:
+      return new VersionEndpointTask(endpoint, conf);
+    case REGISTER:
+      return  RegisterEndpointTask.newBuilder()
+          .setConfig(conf)
+          .setEndpointStateMachine(endpoint)
+          .setNodeID(getContainerNodeID())
+          .build();
+    case HEARTBEAT:
+      return HeartbeatEndpointTask.newBuilder()
+          .setConfig(conf)
+          .setEndpointStateMachine(endpoint)
+          .setNodeID(getContainerNodeID())
+          .build();
+    case SHUTDOWN:
+      break;
+    default:
+      throw new IllegalArgumentException("Illegal Argument.");
+    }
+    return null;
+  }
+
+  /**
+   * Computes the next state the container state machine must move to by looking
+   * at all the state of endpoints.
+   * <p>
+   * if any endpoint state has moved to Shutdown, either we have an
+   * unrecoverable error or we have been told to shutdown. Either case the
+   * datanode state machine should move to Shutdown state, otherwise we
+   * remain in the Running state.
+   *
+   * @return next container state.
+   */
+  private DatanodeStateMachine.DatanodeStates
+      computeNextContainerState(
+      List<Future<EndpointStateMachine.EndPointStates>> results) {
+    for (Future<EndpointStateMachine.EndPointStates> state : results) {
+      try {
+        if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
+          // if any endpoint tells us to shutdown we move to shutdown state.
+          return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Error in executing end point task.", e);
+      }
+    }
+    return DatanodeStateMachine.DatanodeStates.RUNNING;
+  }
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param duration - Time
+   * @param timeUnit - Unit of duration.
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates
+      await(long duration, TimeUnit timeUnit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    int count = connectionManager.getValues().size();
+    int returned = 0;
+    long timeLeft = timeUnit.toMillis(duration);
+    long startTime = Time.monotonicNow();
+    List<Future<EndpointStateMachine.EndPointStates>> results = new
+        LinkedList<>();
+
+    while (returned < count && timeLeft > 0) {
+      Future<EndpointStateMachine.EndPointStates> result =
+          ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
+      if (result != null) {
+        results.add(result);
+        returned++;
+      }
+      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
+    }
+    return computeNextContainerState(results);
+  }
+}

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+/**
+ This package contians files that guide the state transitions from
+ Init->Running->Shutdown for the datanode.
+ */

+ 181 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * Heartbeat class for SCMs.
+ */
+public class HeartbeatEndpointTask
+    implements Callable<EndpointStateMachine.EndPointStates> {
+  static final Logger LOG =
+      LoggerFactory.getLogger(HeartbeatEndpointTask.class);
+  private final EndpointStateMachine rpcEndpoint;
+  private final Configuration conf;
+  private ContainerNodeIDProto containerNodeIDProto;
+
+  /**
+   * Constructs a SCM heart beat.
+   *
+   * @param conf Config.
+   */
+  public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
+      Configuration conf) {
+    this.rpcEndpoint = rpcEndpoint;
+    this.conf = conf;
+  }
+
+  /**
+   * Get the container Node ID proto.
+   *
+   * @return ContainerNodeIDProto
+   */
+  public ContainerNodeIDProto getContainerNodeIDProto() {
+    return containerNodeIDProto;
+  }
+
+  /**
+   * Set container node ID proto.
+   *
+   * @param containerNodeIDProto - the node id.
+   */
+  public void setContainerNodeIDProto(ContainerNodeIDProto
+      containerNodeIDProto) {
+    this.containerNodeIDProto = containerNodeIDProto;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+    rpcEndpoint.lock();
+    try {
+      Preconditions.checkState(this.containerNodeIDProto != null);
+      DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
+          .containerNodeIDProto.getDatanodeID());
+      // TODO : Add the command to command processor queue.
+      rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID);
+      rpcEndpoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndpoint.logIfNeeded(ex
+      );
+    } finally {
+      rpcEndpoint.unlock();
+    }
+    return rpcEndpoint.getState();
+  }
+
+  /**
+   * Returns a builder class for HeartbeatEndpointTask task.
+   * @return   Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for HeartbeatEndpointTask.
+   */
+  public static class Builder {
+    private EndpointStateMachine endPointStateMachine;
+    private Configuration conf;
+    private ContainerNodeIDProto containerNodeIDProto;
+
+    /**
+     * Constructs the builder class.
+     */
+    public Builder() {
+    }
+
+    /**
+     * Sets the endpoint state machine.
+     *
+     * @param rpcEndPoint - Endpoint state machine.
+     * @return Builder
+     */
+    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
+      this.endPointStateMachine = rpcEndPoint;
+      return this;
+    }
+
+    /**
+     * Sets the Config.
+     *
+     * @param config  - config
+     * @return  Builder
+     */
+    public Builder setConfig(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * Sets the NodeID.
+     *
+     * @param nodeID - NodeID proto
+     * @return Builder
+     */
+    public Builder setNodeID(ContainerNodeIDProto nodeID) {
+      this.containerNodeIDProto = nodeID;
+      return this;
+    }
+
+    public HeartbeatEndpointTask build() {
+      if (endPointStateMachine == null) {
+        LOG.error("No endpoint specified.");
+        throw new IllegalArgumentException("A valid endpoint state machine is" +
+            " needed to construct HeartbeatEndpointTask task");
+      }
+
+      if (conf == null) {
+        LOG.error("No config specified.");
+        throw new IllegalArgumentException("A valid configration is needed to" +
+            " construct HeartbeatEndpointTask task");
+      }
+
+      if (containerNodeIDProto == null) {
+        LOG.error("No nodeID specified.");
+        throw new IllegalArgumentException("A vaild Node ID is needed to " +
+            "construct HeartbeatEndpointTask task");
+      }
+
+      HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
+          .endPointStateMachine, this.conf);
+      task.setContainerNodeIDProto(containerNodeIDProto);
+      return task;
+    }
+
+  }
+}

+ 198 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * Register a container with SCM.
+ */
+public final class RegisterEndpointTask implements
+    Callable<EndpointStateMachine.EndPointStates> {
+  static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class);
+
+  private final EndpointStateMachine rpcEndPoint;
+  private final Configuration conf;
+  private Future<EndpointStateMachine.EndPointStates> result;
+  private ContainerNodeIDProto containerNodeIDProto;
+
+  /**
+   * Creates a register endpoint task.
+   *
+   * @param rpcEndPoint - endpoint
+   * @param conf - conf
+   */
+  @VisibleForTesting
+  public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
+      Configuration conf) {
+    this.rpcEndPoint = rpcEndPoint;
+    this.conf = conf;
+
+  }
+
+  /**
+   * Get the ContainerNodeID Proto.
+   *
+   * @return ContainerNodeIDProto
+   */
+  public ContainerNodeIDProto getContainerNodeIDProto() {
+    return containerNodeIDProto;
+  }
+
+  /**
+   * Set the contiainerNodeID Proto.
+   *
+   * @param containerNodeIDProto - Container Node ID.
+   */
+  public void setContainerNodeIDProto(ContainerNodeIDProto
+      containerNodeIDProto) {
+    this.containerNodeIDProto = containerNodeIDProto;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+
+    if (getContainerNodeIDProto() == null) {
+      LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " +
+          "shutting down the endpoint.");
+      return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
+    }
+
+    rpcEndPoint.lock();
+    try {
+      DatanodeID dnNodeID = DatanodeID.getFromProtoBuf(
+          getContainerNodeIDProto().getDatanodeID());
+
+      // TODO : Add responses to the command Queue.
+      rpcEndPoint.getEndPoint().register(dnNodeID,
+          conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES));
+      EndpointStateMachine.EndPointStates nextState =
+          rpcEndPoint.getState().getNextState();
+      rpcEndPoint.setState(nextState);
+      rpcEndPoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndPoint.logIfNeeded(ex
+      );
+    } finally {
+      rpcEndPoint.unlock();
+    }
+
+    return rpcEndPoint.getState();
+  }
+
+  /**
+   * Returns a builder class for RegisterEndPoint task.
+   *
+   * @return Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for RegisterEndPoint task.
+   */
+  public static class Builder {
+    private EndpointStateMachine endPointStateMachine;
+    private Configuration conf;
+    private ContainerNodeIDProto containerNodeIDProto;
+
+    /**
+     * Constructs the builder class.
+     */
+    public Builder() {
+    }
+
+    /**
+     * Sets the endpoint state machine.
+     *
+     * @param rpcEndPoint - Endpoint state machine.
+     * @return Builder
+     */
+    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
+      this.endPointStateMachine = rpcEndPoint;
+      return this;
+    }
+
+    /**
+     * Sets the Config.
+     *
+     * @param config - config
+     * @return Builder.
+     */
+    public Builder setConfig(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * Sets the NodeID.
+     *
+     * @param nodeID - NodeID proto
+     * @return Builder
+     */
+    public Builder setNodeID(ContainerNodeIDProto nodeID) {
+      this.containerNodeIDProto = nodeID;
+      return this;
+    }
+
+    public RegisterEndpointTask build() {
+      if (endPointStateMachine == null) {
+        LOG.error("No endpoint specified.");
+        throw new IllegalArgumentException("A valid endpoint state machine is" +
+            " needed to construct RegisterEndPoint task");
+      }
+
+      if (conf == null) {
+        LOG.error("No config specified.");
+        throw new IllegalArgumentException("A valid configration is needed to" +
+            " construct RegisterEndpoint task");
+      }
+
+      if (containerNodeIDProto == null) {
+        LOG.error("No nodeID specified.");
+        throw new IllegalArgumentException("A vaild Node ID is needed to " +
+            "construct RegisterEndpoint task");
+      }
+
+      RegisterEndpointTask task = new RegisterEndpointTask(this
+          .endPointStateMachine, this.conf);
+      task.setContainerNodeIDProto(containerNodeIDProto);
+      return task;
+    }
+  }
+}

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * Task that returns version.
+ */
+public class VersionEndpointTask implements
+    Callable<EndpointStateMachine.EndPointStates> {
+  private final EndpointStateMachine rpcEndPoint;
+  private final Configuration configuration;
+
+  public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
+      Configuration conf) {
+    this.rpcEndPoint = rpcEndPoint;
+    this.configuration = conf;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+    rpcEndPoint.lock();
+    try{
+      SCMVersionResponseProto versionResponse =
+          rpcEndPoint.getEndPoint().getVersion();
+      rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
+
+      EndpointStateMachine.EndPointStates nextState =
+          rpcEndPoint.getState().getNextState();
+      rpcEndPoint.setState(nextState);
+      rpcEndPoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndPoint.logIfNeeded(ex);
+    } finally {
+      rpcEndPoint.unlock();
+    }
+    return rpcEndPoint.getState();
+  }
+}

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java

@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+/**
+ This package contains code for RPC endpoints transitions.
+ */

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states;

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -64,8 +64,8 @@ public class OzoneContainer {
       Configuration ozoneConfig,
       FsDatasetSpi<? extends FsVolumeSpi> dataSet) throws Exception {
     List<StorageLocation> locations = new LinkedList<>();
-    String[] paths = ozoneConfig.getStrings(OzoneConfigKeys
-        .OZONE_METADATA_DIRS);
+    String[] paths = ozoneConfig.getStrings(
+        OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS);
     if (paths != null && paths.length > 0) {
       for (String p : paths) {
         locations.add(StorageLocation.parse(p));

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+
+import java.io.IOException;
+
+/**
+ * The protocol spoken between datanodes and SCM. For specifics please the
+ * Protoc file that defines this protocol.
+ */
+@InterfaceAudience.Private
+public interface StorageContainerDatanodeProtocol {
+  /**
+   * Returns SCM version.
+   * @return Version info.
+   */
+  SCMVersionResponseProto getVersion() throws IOException;
+
+  /**
+   * Used by data node to send a Heartbeat.
+   * @param datanodeID - Datanode ID.
+   * @return - SCMHeartbeatResponseProto
+   * @throws IOException
+   */
+  SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
+      throws IOException;
+
+  /**
+   * Register Datanode.
+   * @param datanodeID - DatanodID.
+   * @param scmAddresses - List of SCMs this datanode is configured to
+   *                     communicate.
+   * @return SCM Command.
+   */
+  SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+      String[] scmAddresses) throws IOException;
+
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.ozone.protocol.commands;
 
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
 
 

+ 16 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java

@@ -18,15 +18,19 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+    .ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
 
 /**
  * Response to Datanode Register call.
  */
-public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
-
+public class RegisteredCommand extends
+    SCMCommand<SCMRegisteredCmdResponseProto> {
   private String datanodeUUID;
   private String clusterID;
   private ErrorCode error;
@@ -38,8 +42,6 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
     this.error = error;
   }
 
-
-
   /**
    * Returns a new builder.
    *
@@ -56,11 +58,12 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
    */
   @Override
   Type getType() {
-    return Type.registeredCmd;
+    return Type.registeredCommand;
   }
 
   /**
    * Returns datanode UUID.
+   *
    * @return - Datanode ID.
    */
   public String getDatanodeUUID() {
@@ -69,6 +72,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
 
   /**
    * Returns cluster ID.
+   *
    * @return -- ClusterID
    */
   public String getClusterID() {
@@ -77,6 +81,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
 
   /**
    * Returns ErrorCode.
+   *
    * @return - ErrorCode
    */
   public ErrorCode getError() {
@@ -89,8 +94,8 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
    * @return A protobuf message.
    */
   @Override
-  RegisteredCmdResponseProto getProtoBufMessage() {
-    return RegisteredCmdResponseProto.newBuilder()
+  SCMRegisteredCmdResponseProto getProtoBufMessage() {
+    return SCMRegisteredCmdResponseProto.newBuilder()
         .setClusterID(this.clusterID)
         .setDatanodeUUID(this.datanodeUUID)
         .setErrorCode(this.error)
@@ -122,7 +127,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
      * @param response - RegisteredCmdResponseProto
      * @return RegisteredCommand
      */
-    public  RegisteredCommand getFromProtobuf(RegisteredCmdResponseProto
+    public  RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto
                                                         response) {
       Preconditions.checkNotNull(response);
       return new RegisteredCommand(response.getErrorCode(),

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java

@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.protocol.commands;
 
 import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+    .StorageContainerDatanodeProtocolProtos.Type;
 import com.google.protobuf.GeneratedMessage;
 
 /**

+ 154 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class is the client-side translator to translate the requests made on
+ * the {@link StorageContainerDatanodeProtocol} interface to the RPC server
+ * implementing {@link StorageContainerDatanodeProtocolPB}.
+ */
+public class StorageContainerDatanodeProtocolClientSideTranslatorPB
+    implements StorageContainerDatanodeProtocol, ProtocolTranslator, Closeable {
+
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+  private final StorageContainerDatanodeProtocolPB rpcProxy;
+
+  /**
+   * Constructs a Client side interface that calls into SCM datanode protocol.
+   *
+   * @param rpcProxy - Proxy for RPC.
+   */
+  public StorageContainerDatanodeProtocolClientSideTranslatorPB(
+      StorageContainerDatanodeProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it. If
+   * the stream is already closed then invoking this method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful attention. It is strongly advised to relinquish the
+   * underlying resources and to internally <em>mark</em> the {@code Closeable}
+   * as closed, prior to throwing the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  /**
+   * Return the proxy object underlying this protocol translator.
+   *
+   * @return the proxy object underlying this protocol translator.
+   */
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  /**
+   * Returns SCM version.
+   *
+   * @return Version info.
+   */
+  @Override
+  public SCMVersionResponseProto getVersion() throws IOException {
+
+    SCMVersionRequestProto request =
+        SCMVersionRequestProto.newBuilder().build();
+    final SCMVersionResponseProto response;
+    try {
+      response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
+  /**
+   * Send by datanode to SCM.
+   *
+   * @param datanodeID - DatanodeID
+   * @throws IOException
+   */
+
+  @Override
+  public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
+      throws IOException {
+    SCMHeartbeatRequestProto.Builder req =
+        SCMHeartbeatRequestProto.newBuilder();
+    req.setDatanodeID(datanodeID.getProtoBufMessage());
+    final SCMHeartbeatResponseProto resp;
+    try {
+      resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return resp;
+  }
+
+  /**
+   * Register Datanode.
+   *
+   * @param datanodeID - DatanodID.
+   * @return SCM Command.
+   */
+  @Override
+  public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+      String[] scmAddresses) throws IOException {
+    SCMRegisterRequestProto.Builder req =
+        SCMRegisterRequestProto.newBuilder();
+    req.setDatanodeID(datanodeID.getProtoBufMessage());
+    final SCMRegisteredCmdResponseProto response;
+    try {
+      response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return response;
+  }
+
+}

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService;
+
+/**
+ * Protocol used from a datanode to StorageContainerManager.  This extends
+ * the Protocol Buffers service interface to add Hadoop-specific annotations.
+ */
+
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol",
+    protocolVersion = 1)
+public interface StorageContainerDatanodeProtocolPB extends
+    StorageContainerDatanodeProtocolService.BlockingInterface {
+}

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+
+import java.io.IOException;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerDatanodeProtocolPB} to the {@link
+ * StorageContainerDatanodeProtocol} server implementation.
+ */
+public class StorageContainerDatanodeProtocolServerSideTranslatorPB
+    implements StorageContainerDatanodeProtocolPB {
+
+  private final StorageContainerDatanodeProtocol impl;
+
+  public StorageContainerDatanodeProtocolServerSideTranslatorPB(
+      StorageContainerDatanodeProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
+      getVersion(RpcController controller,
+      StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getVersion();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+      register(RpcController controller, StorageContainerDatanodeProtocolProtos
+      .SCMRegisterRequestProto request) throws ServiceException {
+    String[] addressArray = null;
+
+    if (request.hasAddressList()) {
+      addressArray = request.getAddressList().getAddressListList()
+          .toArray(new String[0]);
+    }
+
+    try {
+      return impl.register(DatanodeID.getFromProtoBuf(request
+          .getDatanodeID()), addressArray);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SCMHeartbeatResponseProto
+      sendHeartbeat(RpcController controller,
+      SCMHeartbeatRequestProto request) throws ServiceException {
+    try {
+      return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
+          .getDatanodeID()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java

@@ -26,6 +26,8 @@ public final class VersionInfo {
   private final static VersionInfo[] VERSION_INFOS =
       {new VersionInfo("First version of SCM", 1)};
 
+
+  public static final String DESCRIPTION_KEY = "Description";
   private final String description;
   private final int version;
 

+ 15 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -29,8 +29,11 @@ import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+    .ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 
 import org.apache.hadoop.ozone.scm.VersionInfo;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -43,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -580,21 +582,20 @@ public class SCMNodeManager
   @Override
   public SCMCommand register(DatanodeID datanodeID) {
 
-    SCMCommand errorCode = verifyDatanodeUUID(datanodeID);
-    if (errorCode != null) {
-      return errorCode;
+    SCMCommand responseCommand = verifyDatanodeUUID(datanodeID);
+    if (responseCommand != null) {
+      return responseCommand;
     }
-    DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(),
-        datanodeID);
-    nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID);
+
+    nodes.put(datanodeID.getDatanodeUuid(), datanodeID);
     totalNodes.incrementAndGet();
-    healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow());
+    healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
     healthyNodeCount.incrementAndGet();
     LOG.info("Data node with ID: {} Registered.",
-        newDatanodeID.getDatanodeUuid());
+        datanodeID.getDatanodeUuid());
     return RegisteredCommand.newBuilder()
         .setErrorCode(ErrorCode.success)
-        .setDatanodeUUID(newDatanodeID.getDatanodeUuid())
+        .setDatanodeUUID(datanodeID.getDatanodeUuid())
         .setClusterID(this.clusterID)
         .build();
   }
@@ -607,20 +608,12 @@ public class SCMNodeManager
    */
   private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
 
-    // Make sure that we return the right error code, so that
-    // data node can log the correct error. if it is already registered then
-    // datanode should move to heartbeat state. It implies that somehow we
-    // have an error where the data node is trying to re-register.
-    //
-    // We are going to let the datanode know that there is an error but allow it
-    // to recover by sending it the right info that is needed for recovery.
-
     if (datanodeID.getDatanodeUuid() != null &&
         nodes.containsKey(datanodeID.getDatanodeUuid())) {
-      LOG.error("Datanode is already registered. Datanode: {}",
+      LOG.trace("Datanode is already registered. Datanode: {}",
           datanodeID.toString());
       return RegisteredCommand.newBuilder()
-          .setErrorCode(ErrorCode.errorNodeAlreadyRegistered)
+          .setErrorCode(ErrorCode.success)
           .setClusterID(this.clusterID)
           .setDatanodeUUID(datanodeID.getDatanodeUuid())
           .build();

+ 47 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -31,9 +31,13 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
 package hadoop.hdfs;
+
 import "hdfs.proto";
+
 import "HdfsServer.proto";
+
 import "DatanodeProtocol.proto";
+
 import "DatanodeContainerProtocol.proto";
 
 
@@ -45,6 +49,10 @@ message SCMHeartbeatRequestProto {
   required DatanodeIDProto datanodeID = 1;
 }
 
+message SCMRegisterRequestProto {
+  required DatanodeIDProto datanodeID = 1;
+  optional SCMNodeAddressList addressList = 2;
+}
 
 /**
  * Request for version info of the software stack on the server.
@@ -59,24 +67,38 @@ message SCMVersionRequestProto {
 */
 message SCMVersionResponseProto {
   required uint32 softwareVersion = 1;
-  repeated  hadoop.hdfs.ozone.KeyValue keys = 2;
+  repeated hadoop.hdfs.ozone.KeyValue keys = 2;
+}
+
+message SCMNodeAddressList {
+  repeated string addressList = 1;
 }
 
 /**
  * Datanode ID returned by the SCM. This is similar to name node
  * registeration of a datanode.
  */
-message RegisteredCmdResponseProto {
+message SCMRegisteredCmdResponseProto {
   enum ErrorCode {
     success = 1;
-    errorNodeAlreadyRegistered = 2;
-    errorNodeNotPermitted = 3;
+    errorNodeNotPermitted = 2;
   }
-  required ErrorCode errorCode = 1;
-  optional string datanodeUUID = 2;
-  optional string clusterID = 3;
+  required ErrorCode errorCode = 2;
+  optional string datanodeUUID = 3;
+  optional string clusterID = 4;
+  optional SCMNodeAddressList addressList = 5;
+}
+
+/**
+ * Container ID maintains the container's Identity along with cluster ID
+ * after the registration is done.
+ */
+message ContainerNodeIDProto {
+  required DatanodeIDProto datanodeID = 1;
+  optional string clusterID = 2;
 }
 
+
 /**
  * Empty Command Response
  */
@@ -84,18 +106,21 @@ message NullCmdResponseProto {
 
 }
 
+/**
+Type of commands supported by SCM to datanode protocol.
+*/
+enum Type {
+  nullCmd = 1;
+  versionCommand = 2;
+  registeredCommand = 3;
+}
+
 /*
  * These are commands returned by SCM for to the datanode to execute.
  */
-message SCMCommandResponseProto  {
-  enum Type {
-    nullCmd = 1;
-    registeredCmd = 2; // Returns the datanode ID after registeration.
-  }
-
-  required Type cmdType = 1; // Type of the command
-  optional NullCmdResponseProto nullCommand = 2;
-  optional  RegisteredCmdResponseProto  registerNode = 3;
+message SCMCommandResponseProto {
+  required Type cmdType = 2; // Type of the command
+  optional NullCmdResponseProto nullCommand = 3;
 }
 
 
@@ -160,12 +185,11 @@ message SCMHeartbeatResponseProto {
  * registered with some SCM. If this file is not found, datanode assumes that
  * it needs to do a registration.
  *
- * If registration is need datanode moves into REGISTERING_NODE state. It will
- * send a register call with datanodeID data structure, but with datanode UUID
- * will be set to an empty string.
+ * If registration is need datanode moves into REGISTER state. It will
+ * send a register call with datanodeID data structure and presist that info.
  *
- * The response to the command contains the datanode UUID and clusterID. This
- * information is persisted by the datanode and moves into heartbeat state.
+ * The response to the command contains clusterID. This information is
+ * also persisted by the datanode and moves into heartbeat state.
  *
  * Once in the heartbeat state, datanode sends heartbeats and container reports
  * to SCM and process commands issued by SCM until it is shutdown.
@@ -176,12 +200,12 @@ service StorageContainerDatanodeProtocolService {
   /**
   * Gets the version information from the SCM.
   */
-  rpc getVersion(SCMVersionRequestProto) returns (SCMVersionResponseProto);
+  rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto);
 
   /**
   * Registers a data node with SCM.
   */
-  rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto);
+  rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto);
 
   /**
    * Send heartbeat from datanode to SCM. HB's under SCM looks more

+ 188 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java

@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Test Endpoint class.
+ */
+public final class SCMTestUtils {
+  /**
+   * Never constructed.
+   */
+  private SCMTestUtils() {
+  }
+
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param addr configured address of RPC server
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param handlerCount RPC server handler count
+   * @return RPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private static RPC.Server startRpcServer(Configuration conf,
+      InetSocketAddress addr, Class<?>
+      protocol, BlockingService instance, int handlerCount)
+      throws IOException {
+    RPC.Server rpcServer = new RPC.Builder(conf)
+        .setProtocol(protocol)
+        .setInstance(instance)
+        .setBindAddress(addr.getHostString())
+        .setPort(addr.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .setSecretManager(null)
+        .build();
+
+    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+    return rpcServer;
+  }
+
+  /**
+   * Creates an Endpoint class for testing purpose.
+   *
+   * @param conf - Conf
+   * @param address - InetAddres
+   * @param rpcTimeout - rpcTimeOut
+   * @return EndPoint
+   * @throws Exception
+   */
+  public static EndpointStateMachine createEndpoint(Configuration conf,
+      InetSocketAddress address, int rpcTimeout) throws Exception {
+    RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long version =
+        RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+    StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+        StorageContainerDatanodeProtocolPB.class, version,
+        address, UserGroupInformation.getCurrentUser(), conf,
+        NetUtils.getDefaultSocketFactory(conf), rpcTimeout,
+        RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy();
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+        new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+    return new EndpointStateMachine(address, rpcClient, conf);
+  }
+
+  /**
+   * Start Datanode RPC server.
+   */
+  public static RPC.Server startScmRpcServer(Configuration configuration,
+      StorageContainerDatanodeProtocol server,
+      InetSocketAddress rpcServerAddresss, int handlerCount) throws
+      IOException {
+    RPC.setProtocolEngine(configuration,
+        StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    BlockingService scmDatanodeService =
+        StorageContainerDatanodeProtocolService.
+            newReflectiveBlockingService(
+                new StorageContainerDatanodeProtocolServerSideTranslatorPB(
+                    server));
+
+    RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
+        StorageContainerDatanodeProtocolPB.class, scmDatanodeService,
+        handlerCount);
+
+    scmServer.start();
+    return scmServer;
+  }
+
+  public static InetSocketAddress getReuseableAddress() throws IOException {
+    try (ServerSocket socket = new ServerSocket(0)) {
+      socket.setReuseAddress(true);
+      int port = socket.getLocalPort();
+      String addr = InetAddress.getLoopbackAddress().getHostAddress()
+          .toString();
+      return new InetSocketAddress(addr, port);
+    }
+  }
+
+  public static Configuration getConf() {
+    return new Configuration();
+  }
+
+  public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
+
+    return getDatanodeID(nodeManager, UUID.randomUUID().toString());
+  }
+
+  /**
+   * Create a new DatanodeID with NodeID set to the string.
+   *
+   * @param uuid - node ID, it is generally UUID.
+   * @return DatanodeID.
+   */
+  public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, String
+      uuid) {
+    DatanodeID tempDataNode = getDatanodeID(uuid);
+    RegisteredCommand command =
+        (RegisteredCommand) nodeManager.register(tempDataNode);
+    return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
+  }
+
+  /**
+   * Get a datanode ID.
+   *
+   * @return DatanodeID
+   */
+  public static DatanodeID getDatanodeID() {
+    return getDatanodeID(UUID.randomUUID().toString());
+  }
+
+  private static DatanodeID getDatanodeID(String uuid) {
+    Random random = new Random();
+    String ipAddress = random.nextInt(256) + "."
+        + random.nextInt(256) + "."
+        + random.nextInt(256) + "."
+        + random.nextInt(256);
+
+    String hostName = uuid;
+    return new DatanodeID(ipAddress,
+        hostName, uuid, 0, 0, 0, 0);
+  }
+}

+ 149 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.NullCommand;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.scm.VersionInfo;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SCM RPC mock class.
+ */
+public class ScmTestMock implements StorageContainerDatanodeProtocol {
+  private int rpcResponseDelay;
+  private AtomicInteger heartbeatCount = new AtomicInteger(0);
+  private AtomicInteger rpcCount = new AtomicInteger(0);
+
+  /**
+   * Returns the number of heartbeats made to this class.
+   *
+   * @return int
+   */
+  public int getHeartbeatCount() {
+    return heartbeatCount.get();
+  }
+
+  /**
+   * Returns the number of RPC calls made to this mock class instance.
+   *
+   * @return - Number of RPC calls serviced by this class.
+   */
+  public int getRpcCount() {
+    return rpcCount.get();
+  }
+
+  /**
+   * Gets the RPC response delay.
+   *
+   * @return delay in milliseconds.
+   */
+  public int getRpcResponseDelay() {
+    return rpcResponseDelay;
+  }
+
+  /**
+   * Sets the RPC response delay.
+   *
+   * @param rpcResponseDelay - delay in milliseconds.
+   */
+  public void setRpcResponseDelay(int rpcResponseDelay) {
+    this.rpcResponseDelay = rpcResponseDelay;
+  }
+
+  /**
+   * Returns SCM version.
+   *
+   * @return Version info.
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
+      getVersion() throws IOException {
+    rpcCount.incrementAndGet();
+    sleepIfNeeded();
+    VersionInfo versionInfo = VersionInfo.getLatestVersion();
+    return VersionResponse.newBuilder()
+        .setVersion(versionInfo.getVersion())
+        .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
+        .build().getProtobufMessage();
+  }
+
+  private void sleepIfNeeded() {
+    if (getRpcResponseDelay() > 0) {
+      try {
+        Thread.sleep(getRpcResponseDelay());
+      } catch (InterruptedException ex) {
+        // Just ignore this exception.
+      }
+    }
+  }
+
+  /**
+   * Used by data node to send a Heartbeat.
+   *
+   * @param datanodeID - Datanode ID.
+   * @return - SCMHeartbeatResponseProto
+   * @throws IOException
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+      sendHeartbeat(DatanodeID datanodeID)
+      throws IOException {
+    rpcCount.incrementAndGet();
+    heartbeatCount.incrementAndGet();
+    sleepIfNeeded();
+    StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
+        cmdResponse = StorageContainerDatanodeProtocolProtos
+        .SCMCommandResponseProto
+        .newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos
+            .Type.nullCmd)
+        .setNullCommand(
+            NullCommand.newBuilder().build().getProtoBufMessage()).build();
+    return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+        .newBuilder()
+        .addCommands(cmdResponse).build();
+  }
+
+  /**
+   * Register Datanode.
+   *
+   * @param datanodeID - DatanodID.
+   * @param scmAddresses - List of SCMs this datanode is configured to
+   * communicate.
+   * @return SCM Command.
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos
+      .SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+      String[] scmAddresses) throws IOException {
+    rpcCount.incrementAndGet();
+    sleepIfNeeded();
+    return StorageContainerDatanodeProtocolProtos
+        .SCMRegisteredCmdResponseProto
+        .newBuilder().setClusterID(UUID.randomUUID().toString())
+        .setDatanodeUUID(datanodeID.getDatanodeUuid()).setErrorCode(
+            StorageContainerDatanodeProtocolProtos
+                .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
+  }
+}

+ 274 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java

@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+    .InitDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+    .RunningDatanodeState;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests the datanode state machine class and its states.
+ */
+public class TestDatanodeStateMachine {
+  private final int scmServerCount = 3;
+  private List<String> serverAddresses;
+  private List<RPC.Server> scmServers;
+  private List<ScmTestMock> mockServers;
+  private ExecutorService executorService;
+  private Configuration conf;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDatanodeStateMachine.class);
+
+  @Before
+  public void setUp() throws Exception {
+    conf = SCMTestUtils.getConf();
+    serverAddresses = new LinkedList<>();
+    scmServers = new LinkedList<>();
+    mockServers = new LinkedList<>();
+    for (int x = 0; x < scmServerCount; x++) {
+      int port = SCMTestUtils.getReuseableAddress().getPort();
+      String address = "127.0.0.1";
+      serverAddresses.add(address + ":" + port);
+      ScmTestMock mock = new ScmTestMock();
+
+      scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock,
+          new InetSocketAddress(address, port), 10));
+      mockServers.add(mock);
+    }
+
+    conf.setStrings(OzoneConfigKeys.OZONE_SCM_NAMES,
+        serverAddresses.toArray(new String[0]));
+
+    URL p = this.getClass().getResource("");
+    String path = p.getPath().concat(
+        TestDatanodeStateMachine.class.getSimpleName());
+    File f = new File(path);
+    if(!f.mkdirs()) {
+      LOG.info("Required directories already exist.");
+    }
+
+    path = Paths.get(path.toString(),
+        TestDatanodeStateMachine.class.getSimpleName() + ".id").toString();
+    conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path);
+
+    executorService = HadoopExecutors.newScheduledThreadPool(
+        conf.getInt(
+            OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
+            OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Test Data Node State Machine Thread - %d").build());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      executorService.shutdownNow();
+      for (RPC.Server s : scmServers) {
+        s.stop();
+      }
+    } catch (Exception e) {
+      //ignore all execption from the shutdown
+    }
+  }
+
+  /**
+   * Assert that starting statemachine executes the Init State.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testDatanodeStateMachineStartThread() throws IOException,
+      InterruptedException, TimeoutException {
+    final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
+    Runnable startStateMachineTask = () -> {
+      try {
+        stateMachine.start();
+      } catch (IOException ex) {
+      }
+    };
+    Thread thread1 = new Thread(startStateMachineTask);
+    thread1.setDaemon(true);
+    thread1.start();
+
+    SCMConnectionManager connectionManager =
+        stateMachine.getConnectionManager();
+
+    GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3 ,
+        100, 1000);
+
+    stateMachine.close();
+  }
+
+  /**
+   * This test explores the state machine by invoking each call in sequence just
+   * like as if the state machine would call it. Because this is a test we are
+   * able to verify each of the assumptions.
+   * <p>
+   * Here is what happens at High level.
+   * <p>
+   * 1. We start the datanodeStateMachine in the INIT State.
+   * <p>
+   * 2. We invoke the INIT state task.
+   * <p>
+   * 3. That creates a set of RPC endpoints that are ready to connect to SCMs.
+   * <p>
+   * 4. We assert that we have moved to the running state for the
+   * DatanodeStateMachine.
+   * <p>
+   * 5. We get the task for the Running State - Executing that running state,
+   * makes the first network call in of the state machine. The Endpoint is in
+   * the GETVERSION State and we invoke the task.
+   * <p>
+   * 6. We assert that this call was a success by checking that each of the
+   * endponts now have version response that it got from the SCM server that it
+   * was talking to and also each of the mock server serviced one RPC call.
+   * <p>
+   * 7. Since the Register is done now, next calls to get task will return
+   * HeartbeatTask, which sends heartbeats to SCM. We assert that we get right
+   * task from sub-system below.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDatanodeStateContext() throws IOException,
+      InterruptedException, ExecutionException, TimeoutException {
+    final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
+    DatanodeStateMachine.DatanodeStates currentState =
+        stateMachine.getContext().getState();
+    Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
+        currentState);
+
+    DatanodeState<DatanodeStateMachine.DatanodeStates> task =
+        stateMachine.getContext().getTask();
+    Assert.assertEquals(InitDatanodeState.class, task.getClass());
+
+    task.execute(executorService);
+    DatanodeStateMachine.DatanodeStates newState =
+        task.await(2, TimeUnit.SECONDS);
+
+    for (EndpointStateMachine endpoint :
+        stateMachine.getConnectionManager().getValues()) {
+      // We assert that each of the is in State GETVERSION.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+          endpoint.getState());
+    }
+
+    // The Datanode has moved into Running State, since endpoints are created.
+    // We move to running state when we are ready to issue RPC calls to SCMs.
+    Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+        newState);
+
+    // If we had called context.execute instead of calling into each state
+    // this would have happened automatically.
+    stateMachine.getContext().setState(newState);
+    task = stateMachine.getContext().getTask();
+    Assert.assertEquals(RunningDatanodeState.class, task.getClass());
+
+    // This execute will invoke getVersion calls against all SCM endpoints
+    // that we know of.
+    task.execute(executorService);
+    newState = task.await(2, TimeUnit.SECONDS);
+
+    // If we are in running state, we should be in running.
+    Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+        newState);
+
+    for (EndpointStateMachine endpoint :
+        stateMachine.getConnectionManager().getValues()) {
+
+      // Since the earlier task.execute called into GetVersion, the
+      // endPointState Machine should move to REGISTER state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          endpoint.getState());
+
+      // We assert that each of the end points have gotten a version from the
+      // SCM Server.
+      Assert.assertNotNull(endpoint.getVersion());
+    }
+
+    // We can also assert that all mock servers have received only one RPC
+    // call at this point of time.
+    for (ScmTestMock mock : mockServers) {
+      Assert.assertEquals(1, mock.getRpcCount());
+    }
+
+    // This task is the Running task, but running task executes tasks based
+    // on the state of Endpoints, hence this next call will be a Register at
+    // the endpoint RPC level.
+    task = stateMachine.getContext().getTask();
+    task.execute(executorService);
+    newState = task.await(2, TimeUnit.SECONDS);
+
+    // If we are in running state, we should be in running.
+    Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+        newState);
+
+    for (ScmTestMock mock : mockServers) {
+      Assert.assertEquals(2, mock.getRpcCount());
+    }
+
+    // This task is the Running task, but running task executes tasks based
+    // on the state of Endpoints, hence this next call will be a
+    // HeartbeatTask at the endpoint RPC level.
+    task = stateMachine.getContext().getTask();
+    task.execute(executorService);
+    newState = task.await(2, TimeUnit.SECONDS);
+
+    // If we are in running state, we should be in running.
+    Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
+        newState);
+
+    for (ScmTestMock mock : mockServers) {
+      Assert.assertEquals(1, mock.getHeartbeatCount());
+    }
+  }
+}

+ 314 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+    .VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.scm.VersionInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.matchers.LessOrEqual;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+/**
+ * Tests the endpoints.
+ */
+public class TestEndPoint {
+  private static InetSocketAddress serverAddress;
+  private static RPC.Server scmServer;
+  private static ScmTestMock scmServerImpl;
+
+  @Test
+  /**
+   * This test asserts that we are able to make a version call to SCM server
+   * and gets back the expected values.
+   */
+  public void testGetVersion() throws Exception {
+    try (EndpointStateMachine rpcEndPoint =
+             SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .getVersion();
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(responseProto.getKeys(0).getKey(),
+          VersionInfo.DESCRIPTION_KEY);
+      Assert.assertEquals(responseProto.getKeys(0).getValue(),
+          VersionInfo.getLatestVersion().getDescription());
+    }
+  }
+
+  @Test
+  /**
+   * We make getVersion RPC call, but via the VersionEndpointTask which is
+   * how the state machine would make the call.
+   */
+  public void testGetVersionTask() throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+        serverAddress, 1000)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+      // if version call worked the endpoint should automatically move to the
+      // next state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          newState);
+
+      // Now rpcEndpoint should remember the version it got from SCM
+      Assert.assertNotNull(rpcEndPoint.getVersion());
+    }
+  }
+
+  @Test
+  /**
+   * This test makes a call to end point where there is no SCM server. We
+   * expect that versionTask should be able to handle it.
+   */
+  public void testGetVersionToInvalidEndpoint() throws Exception {
+    Configuration conf = SCMTestUtils.getConf();
+    InetSocketAddress nonExistentServerAddress = SCMTestUtils
+        .getReuseableAddress();
+    try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+        nonExistentServerAddress, 1000)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+      // This version call did NOT work, so endpoint should remain in the same
+      // state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+          newState);
+    }
+  }
+
+  @Test
+  /**
+   * This test makes a getVersionRPC call, but the DummyStorageServer is
+   * going to respond little slowly. We will assert that we are still in the
+   * GETVERSION state after the timeout.
+   */
+  public void testGetVersionAssertRpcTimeOut() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 100;
+    Configuration conf = SCMTestUtils.getConf();
+
+    try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+        serverAddress, (int) rpcTimeout)) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf);
+
+      scmServerImpl.setRpcResponseDelay(1500);
+      long start = Time.monotonicNow();
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+      long end = Time.monotonicNow();
+      scmServerImpl.setRpcResponseDelay(0);
+      Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+          newState);
+    }
+  }
+
+  @Test
+  public void testRegister() throws Exception {
+    String[] scmAddressArray = new String[1];
+    scmAddressArray[0] = serverAddress.toString();
+    DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID();
+    try (EndpointStateMachine rpcEndPoint =
+             SCMTestUtils.createEndpoint(
+                 SCMTestUtils.getConf(), serverAddress, 1000)) {
+      SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .register(nodeToRegister, scmAddressArray);
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(responseProto.getDatanodeUUID(),
+          nodeToRegister.getDatanodeUuid());
+      Assert.assertNotNull(responseProto.getClusterID());
+    }
+  }
+
+  private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
+      int rpcTimeout, boolean clearContainerID) throws Exception {
+    EndpointStateMachine rpcEndPoint =
+        SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+            scmAddress, rpcTimeout);
+    rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
+    RegisterEndpointTask endpointTask =
+        new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+    if (!clearContainerID) {
+      ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
+          .setClusterID(UUID.randomUUID().toString())
+          .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+          .build();
+      endpointTask.setContainerNodeIDProto(containerNodeID);
+    }
+    endpointTask.call();
+    return rpcEndPoint;
+  }
+
+  @Test
+  public void testRegisterTask() throws Exception {
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(serverAddress, 1000, false)) {
+      // Successful register should move us to Heartbeat state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterToInvalidEndpoint() throws Exception {
+    InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(address, 1000, false)) {
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterNoContainerID() throws Exception {
+    InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+    try (EndpointStateMachine rpcEndpoint =
+             registerTaskHelper(address, 1000, true)) {
+      // No Container ID, therefore we tell the datanode that we would like to
+      // shutdown.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testRegisterRpcTimeout() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 200;
+    scmServerImpl.setRpcResponseDelay(1500);
+    long start = Time.monotonicNow();
+    registerTaskHelper(serverAddress, 1000, false).close();
+    long end = Time.monotonicNow();
+    scmServerImpl.setRpcResponseDelay(0);
+    Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+  }
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    DatanodeID dataNode = SCMTestUtils.getDatanodeID();
+    try (EndpointStateMachine rpcEndPoint =
+             SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
+          .sendHeartbeat(dataNode);
+      Assert.assertNotNull(responseProto);
+      Assert.assertEquals(1, responseProto.getCommandsCount());
+      Assert.assertNotNull(responseProto.getCommandsList().get(0));
+      Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(),
+          Type.nullCmd);
+    }
+  }
+
+  private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
+      int rpcTimeout) throws Exception {
+    EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
+        SCMTestUtils.getConf(),
+        scmAddress, rpcTimeout);
+    ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
+        .setClusterID(UUID.randomUUID().toString())
+        .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+        .build();
+    rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
+    HeartbeatEndpointTask endpointTask =
+        new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+    endpointTask.setContainerNodeIDProto(containerNodeID);
+    endpointTask.call();
+    Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
+    return rpcEndPoint;
+  }
+
+  private void heartbeatTaskHelper(InetSocketAddress address)
+      throws Exception {
+    try (EndpointStateMachine rpcEndpoint =
+             heartbeatTaskHelper(address, 1000)) {
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+          rpcEndpoint.getState());
+    }
+  }
+
+  @Test
+  public void testHeartbeatTask() throws Exception {
+    heartbeatTaskHelper(serverAddress);
+  }
+
+  @Test
+  public void testHeartbeatTaskToInvalidNode() throws Exception {
+    InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+    heartbeatTaskHelper(invalidAddress);
+  }
+
+  @Test
+  public void testHeartbeatTaskRpcTimeOut() throws Exception {
+    final long rpcTimeout = 1000;
+    final long tolerance = 200;
+    scmServerImpl.setRpcResponseDelay(1500);
+    long start = Time.monotonicNow();
+    InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+    heartbeatTaskHelper(invalidAddress);
+    long end = Time.monotonicNow();
+    scmServerImpl.setRpcResponseDelay(0);
+    Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (scmServer != null) {
+      scmServer.stop();
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    serverAddress = SCMTestUtils.getReuseableAddress();
+    scmServerImpl = new ScmTestMock();
+    scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
+        scmServerImpl, serverAddress, 10);
+  }
+}

+ 17 - 68
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -33,7 +33,6 @@ import org.junit.rules.ExpectedException;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
@@ -72,37 +71,6 @@ public class TestNodeManager {
     return new OzoneConfiguration();
   }
 
-  /**
-   * Create a new datanode ID.
-   *
-   * @return DatanodeID
-   */
-  DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
-
-    return getDatanodeID(nodeManager, UUID.randomUUID().toString());
-  }
-
-  /**
-   * Create a new DatanodeID with NodeID set to the string.
-   *
-   * @param uuid - node ID, it is generally UUID.
-   * @return DatanodeID.
-   */
-  DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) {
-    Random random = new Random();
-    String ipAddress = random.nextInt(256) + "."
-        + random.nextInt(256) + "."
-        + random.nextInt(256) + "."
-        + random.nextInt(256);
-
-    String hostName = uuid;
-    DatanodeID tempDataNode = new DatanodeID(ipAddress,
-        hostName, uuid, 0, 0, 0, 0);
-    RegisteredCommand command =
-        (RegisteredCommand) nodeManager.register(tempDataNode);
-    return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
-  }
-
   /**
    * Creates a NodeManager.
    *
@@ -134,7 +102,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       // Send some heartbeats from different nodes.
       for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
-        DatanodeID datanodeID = getDatanodeID(nodeManager);
+        DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
         nodeManager.sendHeartbeat(datanodeID);
       }
 
@@ -181,7 +149,7 @@ public class TestNodeManager {
 
       // Need 100 nodes to come out of chill mode, only one node is sending HB.
       nodeManager.setMinimumChillModeNodes(100);
-      nodeManager.sendHeartbeat(getDatanodeID(nodeManager));
+      nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager));
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
           4 * 1000);
       assertFalse("Not enough heartbeat, Node manager should have been in " +
@@ -203,7 +171,7 @@ public class TestNodeManager {
 
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       nodeManager.setMinimumChillModeNodes(3);
-      DatanodeID datanodeID = getDatanodeID(nodeManager);
+      DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
 
       // Send 10 heartbeat from same node, and assert we never leave chill mode.
       for (int x = 0; x < 10; x++) {
@@ -232,7 +200,7 @@ public class TestNodeManager {
     Configuration conf = getConf();
     conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
     SCMNodeManager nodeManager = createNodeManager(conf);
-    DatanodeID datanodeID = getDatanodeID(nodeManager);
+    DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
     nodeManager.close();
 
     // These should never be processed.
@@ -262,7 +230,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
 
       for (int x = 0; x < count; x++) {
-        DatanodeID datanodeID = getDatanodeID(nodeManager);
+        DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
         nodeManager.sendHeartbeat(datanodeID);
       }
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@@ -346,7 +314,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
           "staleNode");
-      DatanodeID staleNode = getDatanodeID(nodeManager);
+      DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager);
 
       // Heartbeat once
       nodeManager.sendHeartbeat(staleNode);
@@ -396,7 +364,7 @@ public class TestNodeManager {
       List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
           "Node");
 
-      DatanodeID deadNode = getDatanodeID(nodeManager);
+      DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager);
 
       // Heartbeat once
       nodeManager.sendHeartbeat(deadNode);
@@ -427,28 +395,6 @@ public class TestNodeManager {
     }
   }
 
-  /**
-   * Asserts that if we get duplicate registration calls for a datanode, we will
-   * ignore it and LOG the error.
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws TimeoutException
-   */
-  @Test
-  public void testScmDuplicateRegistrationLogsError() throws IOException,
-      InterruptedException, TimeoutException {
-    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
-      GenericTestUtils.LogCapturer logCapturer =
-          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
-      DatanodeID duplicateNodeID = getDatanodeID(nodeManager);
-      nodeManager.register(duplicateNodeID);
-      logCapturer.stopCapturing();
-      assertThat(logCapturer.getOutput(), containsString("Datanode is already" +
-          " registered."));
-    }
-  }
-
   /**
    * Asserts that we log an error for null in datanode ID.
    *
@@ -532,9 +478,12 @@ public class TestNodeManager {
      * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
      */
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
-      DatanodeID healthyNode = getDatanodeID(nodeManager, "HealthyNode");
-      DatanodeID staleNode = getDatanodeID(nodeManager, "StaleNode");
-      DatanodeID deadNode = getDatanodeID(nodeManager, "DeadNode");
+      DatanodeID healthyNode =
+          SCMTestUtils.getDatanodeID(nodeManager, "HealthyNode");
+      DatanodeID staleNode =
+          SCMTestUtils.getDatanodeID(nodeManager, "StaleNode");
+      DatanodeID deadNode =
+          SCMTestUtils.getDatanodeID(nodeManager, "DeadNode");
       nodeManager.sendHeartbeat(healthyNode);
       nodeManager.sendHeartbeat(staleNode);
       nodeManager.sendHeartbeat(deadNode);
@@ -659,7 +608,7 @@ public class TestNodeManager {
       prefix) {
     List<DatanodeID> list = new LinkedList<>();
     for (int x = 0; x < count; x++) {
-      list.add(getDatanodeID(nodeManager, prefix + x));
+      list.add(SCMTestUtils.getDatanodeID(nodeManager, prefix + x));
     }
     return list;
   }
@@ -878,7 +827,7 @@ public class TestNodeManager {
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       nodeManager.setMinimumChillModeNodes(10);
-      DatanodeID datanodeID = getDatanodeID(nodeManager);
+      DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
       nodeManager.sendHeartbeat(datanodeID);
       String status = nodeManager.getChillModeStatus();
       Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
@@ -908,7 +857,7 @@ public class TestNodeManager {
 
       // Assert that node manager force enter cannot be overridden by nodes HBs.
       for(int x= 0; x < 20; x++) {
-        DatanodeID datanode = getDatanodeID(nodeManager);
+        DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager);
         nodeManager.sendHeartbeat(datanode);
       }