Bläddra i källkod

HDFS-11001. Ozone:SCM: Add support for registerNode in SCM. Contributed by Anu Engineer.

Anu Engineer 8 år sedan
förälder
incheckning
59dfa74829

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+
+import java.util.List;
+
+/**
+ * The protocol spoken between datanodes and SCM.
+ *
+ * Please note that the full protocol spoken between a datanode and SCM is
+ * separated into 2 interfaces. One interface that deals with node state and
+ * another interface that deals with containers.
+ *
+ * This interface has functions that deals with the state of datanode.
+ */
+@InterfaceAudience.Private
+public interface StorageContainerNodeProtocol {
+  /**
+   * Gets the version info from SCM.
+   * @param versionRequest - version Request.
+   * @return - returns SCM version info and other required information needed
+   * by datanode.
+   */
+  VersionResponse getVersion(SCMVersionRequestProto versionRequest);
+
+  /**
+   * Register the node if the node finds that it is not registered with any SCM.
+   * @param datanodeID - Send datanodeID with Node info, but datanode UUID is
+   *                   empty. Server returns a datanodeID for the given node.
+   * @return  SCMHeartbeatResponseProto
+   */
+  SCMCommand register(DatanodeID datanodeID);
+
+  /**
+   * Send heartbeat to indicate the datanode is alive and doing well.
+   * @param datanodeID - Datanode ID.
+   * @return SCMheartbeat response list
+   */
+
+  List<SCMCommand> sendHeartbeat(DatanodeID datanodeID);
+}

+ 149 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Version response class.
+ */
+public class VersionResponse {
+  private final int version;
+  private final Map<String, String> values;
+
+  /**
+   * Creates a version response class.
+   * @param version
+   * @param values
+   */
+  public VersionResponse(int version, Map<String, String> values) {
+    this.version = version;
+    this.values = values;
+  }
+
+  /**
+   * Creates a version Response class.
+   * @param version
+   */
+  public VersionResponse(int version) {
+    this.version = version;
+    this.values = new HashMap<>();
+  }
+
+  /**
+   * Returns a new Builder.
+   * @return - Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Returns this class from protobuf message.
+   * @param response - SCMVersionResponseProto
+   * @return VersionResponse
+   */
+  public static VersionResponse getFromProtobuf(SCMVersionResponseProto
+                                                    response) {
+    return new VersionResponse(response.getSoftwareVersion(),
+        response.getKeysList().stream()
+            .collect(Collectors.toMap(KeyValue::getKey,
+                KeyValue::getValue)));
+  }
+
+  /**
+   * Adds a value to version Response.
+   * @param key - String
+   * @param value - String
+   */
+  public void put(String key, String value) {
+    if (this.values.containsKey(key)) {
+      throw new IllegalArgumentException("Duplicate key in version response");
+    }
+    values.put(key, value);
+  }
+
+  /**
+   * Return a protobuf message.
+   * @return SCMVersionResponseProto.
+   */
+  public SCMVersionResponseProto getProtobufMessage() {
+
+    List<KeyValue> list = new LinkedList<>();
+    for (Map.Entry<String, String> entry : values.entrySet()) {
+      list.add(KeyValue.newBuilder().setKey(entry.getKey()).
+          setValue(entry.getValue()).build());
+    }
+    return
+        SCMVersionResponseProto.newBuilder()
+            .setSoftwareVersion(this.version)
+            .addAllKeys(list).build();
+  }
+
+  /**
+   * Builder class.
+   */
+  public static class Builder {
+    private int version;
+    private Map<String, String> values;
+
+    Builder() {
+      values = new HashMap<>();
+    }
+
+    /**
+     * Sets the version.
+     * @param ver - version
+     * @return Builder
+     */
+    public Builder setVersion(int ver) {
+      this.version = ver;
+      return this;
+    }
+
+    /**
+     * Adds a value to version Response.
+     * @param key - String
+     * @param value - String
+     */
+    public Builder addValue(String key, String value) {
+      if (this.values.containsKey(key)) {
+        throw new IllegalArgumentException("Duplicate key in version response");
+      }
+      values.put(key, value);
+      return this;
+    }
+
+    /**
+     * Builds the version response.
+     * @return VersionResponse.
+     */
+    public VersionResponse build() {
+      return new VersionResponse(this.version, this.values);
+    }
+  }
+}

+ 81 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
+
+
+
+/**
+ * For each command that SCM can return we have a class in this commands
+ * directory. This is the Null command, that tells datanode that no action is
+ * needed from it.
+ */
+public class NullCommand extends SCMCommand<NullCmdResponseProto> {
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getType() {
+    return Type.nullCmd;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  public NullCmdResponseProto getProtoBufMessage() {
+    return NullCmdResponseProto.newBuilder().build();
+  }
+
+  /**
+   * Returns a NullCommand class from NullCommandResponse Proto.
+   * @param unused  - unused
+   * @return  NullCommand
+   */
+  public static NullCommand getFromProtobuf(final NullCmdResponseProto
+                                                unused) {
+    return new NullCommand();
+  }
+
+  /**
+   * returns a new builder.
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * A Builder class this is the standard pattern we are using for all commands.
+   */
+  public static class Builder {
+    /**
+     * Return a null command.
+     * @return - NullCommand.
+     */
+    public NullCommand build() {
+      return new NullCommand();
+    }
+  }
+}

+ 172 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+
+/**
+ * Response to Datanode Register call.
+ */
+public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
+
+  private String datanodeUUID;
+  private String clusterID;
+  private ErrorCode error;
+
+  public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
+                           final String clusterID) {
+    this.datanodeUUID = datanodeUUID;
+    this.clusterID = clusterID;
+    this.error = error;
+  }
+
+
+
+  /**
+   * Returns a new builder.
+   *
+   * @return - Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  Type getType() {
+    return Type.registeredCmd;
+  }
+
+  /**
+   * Returns datanode UUID.
+   * @return - Datanode ID.
+   */
+  public String getDatanodeUUID() {
+    return datanodeUUID;
+  }
+
+  /**
+   * Returns cluster ID.
+   * @return -- ClusterID
+   */
+  public String getClusterID() {
+    return clusterID;
+  }
+
+  /**
+   * Returns ErrorCode.
+   * @return - ErrorCode
+   */
+  public ErrorCode getError() {
+    return error;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  RegisteredCmdResponseProto getProtoBufMessage() {
+    return RegisteredCmdResponseProto.newBuilder()
+        .setClusterID(this.clusterID)
+        .setDatanodeUUID(this.datanodeUUID)
+        .setErrorCode(this.error)
+        .build();
+  }
+
+  /**
+   * A builder class to verify all values are sane.
+   */
+  public static class Builder {
+    private String datanodeUUID;
+    private String clusterID;
+    private ErrorCode error;
+
+    /**
+     * sets UUID.
+     *
+     * @param dnUUID - datanode UUID
+     * @return Builder
+     */
+    public Builder setDatanodeUUID(String dnUUID) {
+      this.datanodeUUID = dnUUID;
+      return this;
+    }
+
+    /**
+     * Create this object from a Protobuf message.
+     *
+     * @param response - RegisteredCmdResponseProto
+     * @return RegisteredCommand
+     */
+    public  RegisteredCommand getFromProtobuf(RegisteredCmdResponseProto
+                                                        response) {
+      Preconditions.checkNotNull(response);
+      return new RegisteredCommand(response.getErrorCode(),
+          response.hasDatanodeUUID() ? response.getDatanodeUUID(): "",
+          response.hasClusterID() ? response.getClusterID(): "");
+    }
+
+    /**
+     * Sets cluster ID.
+     *
+     * @param cluster - clusterID
+     * @return Builder
+     */
+    public Builder setClusterID(String cluster) {
+      this.clusterID = cluster;
+      return this;
+    }
+
+    /**
+     * Sets Error code.
+     *
+     * @param errorCode - error code
+     * @return Builder
+     */
+    public Builder setErrorCode(ErrorCode errorCode) {
+      this.error = errorCode;
+      return this;
+    }
+
+    /**
+     * Build the command object.
+     *
+     * @return RegisteredCommand
+     */
+    public RegisteredCommand build() {
+      if ((this.error == ErrorCode.success) &&
+          (this.datanodeUUID == null || this.datanodeUUID.isEmpty()) ||
+          (this.clusterID == null || this.clusterID.isEmpty())) {
+        throw new IllegalArgumentException("On success, RegisteredCommand " +
+            "needs datanodeUUID and ClusterID.");
+      }
+
+      return new
+          RegisteredCommand(this.error, this.datanodeUUID, this.clusterID);
+    }
+  }
+}

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * A class that acts as the base class to convert between Java and SCM
+ * commands in protobuf format.
+ * @param <T>
+ */
+public abstract class SCMCommand<T extends GeneratedMessage> {
+  /**
+   * Returns the type of this command.
+   * @return Type
+   */
+  abstract Type getType();
+
+  /**
+   * Gets the protobuf message of this object.
+   * @return A protobuf message.
+   */
+  abstract T getProtoBufMessage();
+}

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+/**
+ Set of classes that help in protoc conversions.
+ **/

+ 79 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+/**
+ * This is a class that tracks versions of SCM.
+ */
+public final class VersionInfo {
+
+  // We will just be normal and use positive counting numbers for versions.
+  private final static VersionInfo[] VERSION_INFOS =
+      {new VersionInfo("First version of SCM", 1)};
+
+  private final String description;
+  private final int version;
+
+  /**
+   * Never created outside this class.
+   *
+   * @param description -- description
+   * @param version     -- version number
+   */
+  private VersionInfo(String description, int version) {
+    this.description = description;
+    this.version = version;
+  }
+
+  /**
+   * Returns all versions.
+   *
+   * @return Version info array.
+   */
+  public static VersionInfo[] getAllVersions() {
+    return VERSION_INFOS.clone();
+  }
+
+  /**
+   * Returns the latest version.
+   *
+   * @return versionInfo
+   */
+  public static VersionInfo getLatestVersion() {
+    return VERSION_INFOS[VERSION_INFOS.length - 1];
+  }
+
+  /**
+   * Return description.
+   *
+   * @return String
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * Return the version.
+   *
+   * @return int.
+   */
+  public int getVersion() {
+    return version;
+  }
+
+}

+ 102 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.NullCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Command Queue is queue of commands for the datanode.
+ * <p>
+ * Node manager, container Manager and key space managers can queue commands for
+ * datanodes into this queue. These commands will be send in the order in which
+ * there where queued.
+ */
+public class CommandQueue {
+
+  private final Map<DatanodeID, List<SCMCommand>> commandMap;
+  private final Lock lock;
+  private final List<SCMCommand> nullList;
+
+  /**
+   * Constructs a Command Queue.
+   */
+  public CommandQueue() {
+    commandMap = new HashMap<>();
+    lock = new ReentrantLock();
+    nullList = new LinkedList<>();
+    nullList.add(NullCommand.newBuilder().build());
+  }
+
+  /**
+   * Returns  a list of Commands for the datanode to execute, if we have no
+   * commands returns a list with Null Command otherwise the current set of
+   * commands are returned and command map set to empty list again.
+   *
+   * @param datanodeID DatanodeID
+   * @return List of SCM Commands.
+   */
+  @SuppressWarnings("unchecked")
+  List<SCMCommand> getCommand(final DatanodeID datanodeID) {
+    lock.lock();
+
+    try {
+      if (commandMap.containsKey(datanodeID)) {
+        List temp = commandMap.get(datanodeID);
+        if (temp.size() > 0) {
+          LinkedList<SCMCommand> emptyList = new LinkedList<>();
+          commandMap.put(datanodeID, emptyList);
+          return temp;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    return nullList;
+  }
+
+  /**
+   * Adds a Command to the SCM Queue to send the command to container.
+   *
+   * @param datanodeID DatanodeID
+   * @param command    - Command
+   */
+  void addCommand(final DatanodeID datanodeID, final SCMCommand command) {
+    lock.lock();
+    try {
+      if (commandMap.containsKey(datanodeID)) {
+        commandMap.get(datanodeID).add(command);
+      } else {
+        LinkedList<SCMCommand> newList = new LinkedList<>();
+        newList.add(command);
+        commandMap.put(datanodeID, newList);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+}

+ 16 - 31
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java

@@ -1,24 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.hadoop.ozone.scm.node;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.blockmanagement.UnresolvedTopologyException;
 
 import java.io.Closeable;
 import java.util.List;
@@ -47,21 +47,6 @@ import java.util.List;
  */
 public interface NodeManager extends Closeable, Runnable {
 
-  /**
-   * Update the heartbeat timestamp.
-   *
-   * @param datanodeID - Name of the datanode that send us heatbeat.
-   */
-  void updateHeartbeat(DatanodeID datanodeID);
-
-  /**
-   * Add a New Datanode to the NodeManager.
-   *
-   * @param nodeReg - Datanode ID.
-   * @throws UnresolvedTopologyException
-   */
-  void registerNode(DatanodeID nodeReg)
-      throws UnresolvedTopologyException;
 
   /**
    * Removes a data node from the management of this Node Manager.
@@ -73,7 +58,7 @@ public interface NodeManager extends Closeable, Runnable {
 
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
-   *
+   * @param nodestate - State of the node
    * @return List of Datanodes that are Heartbeating SCM.
    */
 
@@ -81,7 +66,7 @@ public interface NodeManager extends Closeable, Runnable {
 
   /**
    * Returns the Number of Datanodes that are communicating with SCM.
-   *
+   * @param nodestate - State of the node
    * @return int -- count
    */
   int getNodeCount(NODESTATE nodestate);

+ 156 - 92
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -1,18 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.hadoop.ozone.scm.node;
 
@@ -24,6 +25,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+
+import org.apache.hadoop.ozone.scm.VersionInfo;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -45,38 +55,30 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 
 /**
  * Maintains information about the Datanodes on SCM side.
- * <p/>
+ * <p>
  * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
- * <p/>
+ * <p>
  * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
- * staleNodesMap to deadNodesMap. This moving of a node from one map to
- * another is controlled by 4 configuration variables. These variables define
- * how many heartbeats must go missing for the node to move from one map to
- * another.
- * <p/>
+ * staleNodesMap to deadNodesMap. This moving of a node from one map to another
+ * is controlled by 4 configuration variables. These variables define how many
+ * heartbeats must go missing for the node to move from one map to another.
+ * <p>
  * Each heartbeat that SCMNodeManager receives is  put into heartbeatQueue. The
  * worker thread wakes up and grabs that heartbeat from the queue. The worker
  * thread will lookup the healthynodes map and update the timestamp if the entry
  * is there. if not it will look up stale and deadnodes map.
- * <p/>
- *
- * TODO: Replace with Node Registration code.
- * if the node is not found in any of these tables it is treated as new node for
- * time being and added to the healthy nodes list.
- *
- * <p/>
- *
- * The getNode(byState) functions make copy of node maps and then creates a
- * list based on that. It should be assumed that these get functions always
- * report *stale* information. For example, getting the deadNodeCount
- * followed by
+ * <p>
+ * The getNode(byState) functions make copy of node maps and then creates a list
+ * based on that. It should be assumed that these get functions always report
+ * *stale* information. For example, getting the deadNodeCount followed by
  * getNodes(DEAD) could very well produce totally different count. Also
  * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
  * guaranteed to add up to the total nodes that we know off. Please treat all
- * get functions in this file as a snap-shot of information that is
- * inconsistent as soon as you read it.
+ * get functions in this file as a snap-shot of information that is inconsistent
+ * as soon as you read it.
  */
-public class SCMNodeManager implements NodeManager {
+public class SCMNodeManager
+    implements NodeManager, StorageContainerNodeProtocol {
 
   @VisibleForTesting
   static final Logger LOG =
@@ -103,12 +105,15 @@ public class SCMNodeManager implements NodeManager {
   private long lastHBProcessedCount;
   private int chillModeNodeCount;
   private final int maxHBToProcessPerLoop;
+  private final String clusterID;
+  private final VersionInfo version;
   private Optional<Boolean> inManualChillMode;
+  private final CommandQueue commandQueue;
 
   /**
    * Constructs SCM machine Manager.
    */
-  public SCMNodeManager(Configuration conf) {
+  public SCMNodeManager(Configuration conf, String clusterID) {
     heartbeatQueue = new ConcurrentLinkedQueue<>();
     healthyNodes = new ConcurrentHashMap<>();
     deadNodes = new ConcurrentHashMap<>();
@@ -119,6 +124,9 @@ public class SCMNodeManager implements NodeManager {
     staleNodeCount = new AtomicInteger(0);
     deadNodeCount = new AtomicInteger(0);
     totalNodes = new AtomicInteger(0);
+    this.clusterID = clusterID;
+    this.version = VersionInfo.getLatestVersion();
+    commandQueue = new CommandQueue();
 
     // TODO: Support this value as a Percentage of known machines.
     chillModeNodeCount = 1;
@@ -133,54 +141,13 @@ public class SCMNodeManager implements NodeManager {
     executorService = HadoopExecutors.newScheduledThreadPool(1,
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
-    this.inManualChillMode =  Optional.absent();
+    this.inManualChillMode = Optional.absent();
 
     Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
     executorService.schedule(this, heartbeatCheckerIntervalMs,
         TimeUnit.MILLISECONDS);
   }
 
-  /**
-   * Add a New Datanode to the NodeManager. This function is invoked with
-   * synchronised(this) being held.
-   *
-   * @param nodeReg - node to register
-   */
-  @Override
-  public void registerNode(DatanodeID nodeReg) {
-    if (nodes.containsKey(nodeReg.getDatanodeUuid())) {
-      LOG.error("Datanode is already registered. Datanode: {}",
-          nodeReg.toString());
-      return;
-    }
-    nodes.put(nodeReg.getDatanodeUuid(), nodeReg);
-    totalNodes.incrementAndGet();
-    healthyNodes.put(nodeReg.getDatanodeUuid(), monotonicNow());
-    healthyNodeCount.incrementAndGet();
-    LOG.info("Data node with ID: {} Registered.", nodeReg.getDatanodeUuid());
-  }
-
-  /**
-   * Register the heartbeat with Machine Manager.
-   *
-   * This requires no synchronization since the heartbeat queue is
-   * ConcurrentLinkedQueue. Hence we don't protect it specifically via a lock.
-   *
-   * @param datanodeID - Name of the datanode that send us heartbeat.
-   */
-  @Override
-  public void updateHeartbeat(DatanodeID datanodeID) {
-    // Checking for NULL to make sure that we don't get
-    // an exception from ConcurrentList.
-    // This could be a problem in tests, if this function is invoked via
-    // protobuf, transport layer will guarantee that this is not null.
-    if (datanodeID != null) {
-      heartbeatQueue.add(datanodeID);
-      return;
-    }
-    LOG.error("Datanode ID in heartbeat is null");
-  }
-
   /**
    * Removes a data node from the management of this Node Manager.
    *
@@ -203,7 +170,7 @@ public class SCMNodeManager implements NodeManager {
    */
   @Override
   public List<DatanodeID> getNodes(NODESTATE nodestate)
-      throws IllegalArgumentException{
+      throws IllegalArgumentException {
     Map<String, Long> set;
     switch (nodestate) {
     case HEALTHY:
@@ -226,7 +193,7 @@ public class SCMNodeManager implements NodeManager {
     }
 
     return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
-            .collect(Collectors.toList());
+        .collect(Collectors.toList());
   }
 
   /**
@@ -241,7 +208,7 @@ public class SCMNodeManager implements NodeManager {
       set = Collections.unmodifiableMap(new HashMap<>(nodes));
     }
     return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
-            .collect(Collectors.toList());
+        .collect(Collectors.toList());
   }
 
   /**
@@ -257,7 +224,7 @@ public class SCMNodeManager implements NodeManager {
   /**
    * Sets the Minimum chill mode nodes count, used only in testing.
    *
-   * @param count  - Number of nodes.
+   * @param count - Number of nodes.
    */
   @VisibleForTesting
   public void setMinimumChillModeNodes(int count) {
@@ -329,7 +296,7 @@ public class SCMNodeManager implements NodeManager {
    */
   @Override
   public boolean isInManualChillMode() {
-    if(this.inManualChillMode.isPresent()) {
+    if (this.inManualChillMode.isPresent()) {
       return this.inManualChillMode.get();
     }
     return false;
@@ -373,6 +340,7 @@ public class SCMNodeManager implements NodeManager {
 
   /**
    * Used for testing.
+   *
    * @return true if the HB check is done.
    */
   @VisibleForTesting
@@ -383,14 +351,14 @@ public class SCMNodeManager implements NodeManager {
   /**
    * This is the real worker thread that processes the HB queue. We do the
    * following things in this thread.
-   *
-   * Process the Heartbeats that are in the HB Queue.
-   * Move Stale or Dead node to healthy if we got a heartbeat from them.
-   * Move Stales Node to dead node table if it is needed.
-   * Move healthy nodes to stale nodes if it is needed.
-   *
+   * <p>
+   * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node to
+   * healthy if we got a heartbeat from them. Move Stales Node to dead node
+   * table if it is needed. Move healthy nodes to stale nodes if it is needed.
+   * <p>
    * if it is a new node, we call register node and add it to the list of nodes.
    * This will be replaced when we support registration of a node in SCM.
+   *
    * @see Thread#run()
    */
   @Override
@@ -553,10 +521,7 @@ public class SCMNodeManager implements NodeManager {
       healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
       deadNodeCount.decrementAndGet();
       healthyNodeCount.incrementAndGet();
-      return;
     }
-
-    registerNode(datanodeID);
   }
 
   /**
@@ -587,4 +552,103 @@ public class SCMNodeManager implements NodeManager {
     return lastHBProcessedCount;
   }
 
+  /**
+   * Gets the version info from SCM.
+   *
+   * @param versionRequest - version Request.
+   * @return - returns SCM version info and other required information needed by
+   * datanode.
+   */
+  @Override
+  public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
+    return VersionResponse.newBuilder()
+        .setVersion(this.version.getVersion())
+        .build();
+  }
+
+  /**
+   * Register the node if the node finds that it is not registered with any
+   * SCM.
+   *
+   * @param datanodeID - Send datanodeID with Node info. This function
+   *                   generates and assigns new datanode ID for the datanode.
+   *                   This allows SCM to be run independent of Namenode if
+   *                   required.
+   *
+   * @return SCMHeartbeatResponseProto
+   */
+  @Override
+  public SCMCommand register(DatanodeID datanodeID) {
+
+    SCMCommand errorCode = verifyDatanodeUUID(datanodeID);
+    if (errorCode != null) {
+      return errorCode;
+    }
+    DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(),
+        datanodeID);
+    nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID);
+    totalNodes.incrementAndGet();
+    healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow());
+    healthyNodeCount.incrementAndGet();
+    LOG.info("Data node with ID: {} Registered.",
+        newDatanodeID.getDatanodeUuid());
+    return RegisteredCommand.newBuilder()
+        .setErrorCode(ErrorCode.success)
+        .setDatanodeUUID(newDatanodeID.getDatanodeUuid())
+        .setClusterID(this.clusterID)
+        .build();
+  }
+
+  /**
+   * Verifies the datanode does not have a valid UUID already.
+   *
+   * @param datanodeID - Datanode UUID.
+   * @return SCMCommand
+   */
+  private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
+
+    // Make sure that we return the right error code, so that
+    // data node can log the correct error. if it is already registered then
+    // datanode should move to heartbeat state. It implies that somehow we
+    // have an error where the data node is trying to re-register.
+    //
+    // We are going to let the datanode know that there is an error but allow it
+    // to recover by sending it the right info that is needed for recovery.
+
+    if (datanodeID.getDatanodeUuid() != null &&
+        nodes.containsKey(datanodeID.getDatanodeUuid())) {
+      LOG.error("Datanode is already registered. Datanode: {}",
+          datanodeID.toString());
+      return RegisteredCommand.newBuilder()
+          .setErrorCode(ErrorCode.errorNodeAlreadyRegistered)
+          .setClusterID(this.clusterID)
+          .setDatanodeUUID(datanodeID.getDatanodeUuid())
+          .build();
+    }
+    return null;
+  }
+
+  /**
+   * Send heartbeat to indicate the datanode is alive and doing well.
+   *
+   * @param datanodeID - Datanode ID.
+   * @return SCMheartbeat response.
+   * @throws IOException
+   */
+  @Override
+  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
+
+    // Checking for NULL to make sure that we don't get
+    // an exception from ConcurrentList.
+    // This could be a problem in tests, if this function is invoked via
+    // protobuf, transport layer will guarantee that this is not null.
+    if (datanodeID != null) {
+      heartbeatQueue.add(datanodeID);
+
+    } else {
+      LOG.error("Datanode ID in heartbeat is null");
+    }
+
+    return commandQueue.getCommand(datanodeID);
+  }
 }

+ 27 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -67,26 +67,45 @@ message SCMVersionResponseProto {
  * registeration of a datanode.
  */
 message RegisteredCmdResponseProto {
-  required string datanodeUUID = 1;
-  required string clusterID = 2;
+  enum ErrorCode {
+    success = 1;
+    errorNodeAlreadyRegistered = 2;
+    errorNodeNotPermitted = 3;
+  }
+  required ErrorCode errorCode = 1;
+  optional string datanodeUUID = 2;
+  optional string clusterID = 3;
+}
+
+/**
+ * Empty Command Response
+ */
+message NullCmdResponseProto {
+
 }
 
 /*
  * These are commands returned by SCM for to the datanode to execute.
  */
-
-message SCMHeartbeatResponseProto {
+message SCMCommandResponseProto  {
   enum Type {
     nullCmd = 1;
     registeredCmd = 2; // Returns the datanode ID after registeration.
   }
 
   required Type cmdType = 1; // Type of the command
-  optional  RegisteredCmdResponseProto  registerNode = 2;
-
+  optional NullCmdResponseProto nullCommand = 2;
+  optional  RegisteredCmdResponseProto  registerNode = 3;
 }
 
 
+/*
+ * A group of commands for the datanode to execute
+ */
+message SCMHeartbeatResponseProto {
+  repeated SCMCommandResponseProto commands = 1;
+}
+
 
 /**
  * Protocol used from a datanode to StorageContainerManager.
@@ -151,8 +170,6 @@ message SCMHeartbeatResponseProto {
  * Once in the heartbeat state, datanode sends heartbeats and container reports
  * to SCM and process commands issued by SCM until it is shutdown.
  *
- * For time being we are going to use SCMHeartbeatResponseProto as the return
- * type for register and sendheartbeat messages.
  */
 service StorageContainerDatanodeProtocolService {
 
@@ -164,13 +181,12 @@ service StorageContainerDatanodeProtocolService {
   /**
   * Registers a data node with SCM.
   */
-  rpc register(SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
+  rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto);
 
   /**
    * Send heartbeat from datanode to SCM. HB's under SCM looks more
    * like life line protocol than HB's under HDFS. In other words, it is
    * extremely light weight and contains no data payload.
    */
-  rpc sendHeartbeat(SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
-
+  rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
 }

+ 99 - 90
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -1,26 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.hadoop.ozone.scm.node;
 
-import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -76,8 +77,9 @@ public class TestNodeManager {
    *
    * @return DatanodeID
    */
-  DatanodeID getDatanodeID() {
-    return getDatanodeID(UUID.randomUUID().toString());
+  DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
+
+    return getDatanodeID(nodeManager, UUID.randomUUID().toString());
   }
 
   /**
@@ -86,16 +88,19 @@ public class TestNodeManager {
    * @param uuid - node ID, it is generally UUID.
    * @return DatanodeID.
    */
-  DatanodeID getDatanodeID(String uuid) {
+  DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) {
     Random random = new Random();
     String ipAddress = random.nextInt(256) + "."
         + random.nextInt(256) + "."
         + random.nextInt(256) + "."
         + random.nextInt(256);
 
-    String hostName = RandomStringUtils.randomAscii(8);
-    return new DatanodeID(ipAddress, hostName, uuid,
-        0, 0, 0, 0);
+    String hostName = uuid;
+    DatanodeID tempDataNode = new DatanodeID(ipAddress,
+        hostName, uuid, 0, 0, 0, 0);
+    RegisteredCommand command =
+        (RegisteredCommand) nodeManager.register(tempDataNode);
+    return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
   }
 
   /**
@@ -107,7 +112,8 @@ public class TestNodeManager {
    */
 
   SCMNodeManager createNodeManager(Configuration config) throws IOException {
-    SCMNodeManager nodeManager = new SCMNodeManager(config);
+    SCMNodeManager nodeManager = new SCMNodeManager(config,
+        UUID.randomUUID().toString());
     assertFalse("Node manager should be in chill mode",
         nodeManager.isOutOfNodeChillMode());
     return nodeManager;
@@ -126,10 +132,10 @@ public class TestNodeManager {
       InterruptedException, TimeoutException {
 
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
-
       // Send some heartbeats from different nodes.
       for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
-        nodeManager.updateHeartbeat(getDatanodeID());
+        DatanodeID datanodeID = getDatanodeID(nodeManager);
+        nodeManager.sendHeartbeat(datanodeID);
       }
 
       // Wait for 4 seconds max.
@@ -175,7 +181,7 @@ public class TestNodeManager {
 
       // Need 100 nodes to come out of chill mode, only one node is sending HB.
       nodeManager.setMinimumChillModeNodes(100);
-      nodeManager.updateHeartbeat(getDatanodeID());
+      nodeManager.sendHeartbeat(getDatanodeID(nodeManager));
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
           4 * 1000);
       assertFalse("Not enough heartbeat, Node manager should have been in " +
@@ -197,11 +203,11 @@ public class TestNodeManager {
 
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       nodeManager.setMinimumChillModeNodes(3);
-      DatanodeID datanodeID = getDatanodeID();
+      DatanodeID datanodeID = getDatanodeID(nodeManager);
 
       // Send 10 heartbeat from same node, and assert we never leave chill mode.
       for (int x = 0; x < 10; x++) {
-        nodeManager.updateHeartbeat(datanodeID);
+        nodeManager.sendHeartbeat(datanodeID);
       }
 
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@@ -226,17 +232,15 @@ public class TestNodeManager {
     Configuration conf = getConf();
     conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
     SCMNodeManager nodeManager = createNodeManager(conf);
+    DatanodeID datanodeID = getDatanodeID(nodeManager);
     nodeManager.close();
 
     // These should never be processed.
-    nodeManager.updateHeartbeat(getDatanodeID());
+    nodeManager.sendHeartbeat(datanodeID);
 
     // Let us just wait for 2 seconds to prove that HBs are not processed.
     Thread.sleep(2 * 1000);
 
-    assertFalse("Node manager executor service is shutdown, should never exit" +
-        " chill mode", nodeManager.isOutOfNodeChillMode());
-
     assertEquals("Assert new HBs were never processed", 0,
         nodeManager.getLastHBProcessedCount());
   }
@@ -256,8 +260,10 @@ public class TestNodeManager {
     final int count = 10;
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+
       for (int x = 0; x < count; x++) {
-        nodeManager.updateHeartbeat(getDatanodeID());
+        DatanodeID datanodeID = getDatanodeID(nodeManager);
+        nodeManager.sendHeartbeat(datanodeID);
       }
       GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
           4 * 1000);
@@ -338,20 +344,19 @@ public class TestNodeManager {
     conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
-      List<DatanodeID> nodeList = new LinkedList<>();
-      DatanodeID staleNode = getDatanodeID();
-      for (int x = 0; x < nodeCount; x++) {
-        nodeList.add(getDatanodeID());
-      }
+      List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
+          "staleNode");
+      DatanodeID staleNode = getDatanodeID(nodeManager);
+
       // Heartbeat once
-      nodeManager.updateHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(staleNode);
 
       // Heartbeat all other nodes.
-      nodeList.forEach(nodeManager::updateHeartbeat);
+      nodeList.forEach(nodeManager::sendHeartbeat);
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
-      nodeList.forEach(nodeManager::updateHeartbeat);
+      nodeList.forEach(nodeManager::sendHeartbeat);
 
       // Wait for 2 more seconds, 3 seconds is the stale window for this test
       Thread.sleep(2 * 1000);
@@ -388,36 +393,34 @@ public class TestNodeManager {
 
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
-      List<DatanodeID> nodeList = new LinkedList<>();
+      List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
+          "Node");
+
+      DatanodeID deadNode = getDatanodeID(nodeManager);
 
-      DatanodeID deadNode = getDatanodeID();
-      for (int x = 0; x < nodeCount; x++) {
-        nodeList.add(getDatanodeID());
-      }
       // Heartbeat once
-      nodeManager.updateHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(deadNode);
 
       // Heartbeat all other nodes.
-      nodeList.forEach(nodeManager::updateHeartbeat);
+      nodeList.forEach(nodeManager::sendHeartbeat);
 
       // Wait for 2 seconds .. and heartbeat good nodes again.
       Thread.sleep(2 * 1000);
 
-      nodeList.forEach(nodeManager::updateHeartbeat);
+      nodeList.forEach(nodeManager::sendHeartbeat);
       Thread.sleep(3 * 1000);
 
       // heartbeat good nodes again.
-      nodeList.forEach(nodeManager::updateHeartbeat);
+      nodeList.forEach(nodeManager::sendHeartbeat);
 
       //  6 seconds is the dead window for this test , so we wait a total of
       // 7 seconds to make sure that the node moves into dead state.
       Thread.sleep(2 * 1000);
 
       // Check for the dead node now.
-      List<DatanodeID> deadNodeList = nodeManager
-          .getNodes(DEAD);
-      assertEquals("Expected to find 1 dead node", 1, nodeManager
-          .getNodeCount(DEAD));
+      List<DatanodeID> deadNodeList = nodeManager.getNodes(DEAD);
+      assertEquals("Expected to find 1 dead node", 1,
+          nodeManager.getNodeCount(DEAD));
       assertEquals("Expected to find 1 dead node", 1, deadNodeList.size());
       assertEquals("Dead node is not the expected ID", deadNode
           .getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
@@ -438,9 +441,8 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       GenericTestUtils.LogCapturer logCapturer =
           GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
-      DatanodeID duplicateNodeID = getDatanodeID();
-      nodeManager.registerNode(duplicateNodeID);
-      nodeManager.registerNode(duplicateNodeID);
+      DatanodeID duplicateNodeID = getDatanodeID(nodeManager);
+      nodeManager.register(duplicateNodeID);
       logCapturer.stopCapturing();
       assertThat(logCapturer.getOutput(), containsString("Datanode is already" +
           " registered."));
@@ -460,7 +462,7 @@ public class TestNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
       GenericTestUtils.LogCapturer logCapturer =
           GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
-      nodeManager.updateHeartbeat(null);
+      nodeManager.sendHeartbeat(null);
       logCapturer.stopCapturing();
       assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
           "heartbeat is null"));
@@ -486,11 +488,6 @@ public class TestNodeManager {
   @Test
   public void testScmClusterIsInExpectedState1() throws IOException,
       InterruptedException, TimeoutException {
-
-    DatanodeID healthyNode = getDatanodeID("HealthyNode");
-    DatanodeID staleNode = getDatanodeID("StaleNode");
-    DatanodeID deadNode = getDatanodeID("DeadNode");
-
     /**
      * These values are very important. Here is what it means so you don't
      * have to look it up while reading this code.
@@ -535,9 +532,12 @@ public class TestNodeManager {
      * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
      */
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
-      nodeManager.updateHeartbeat(healthyNode);
-      nodeManager.updateHeartbeat(staleNode);
-      nodeManager.updateHeartbeat(deadNode);
+      DatanodeID healthyNode = getDatanodeID(nodeManager, "HealthyNode");
+      DatanodeID staleNode = getDatanodeID(nodeManager, "StaleNode");
+      DatanodeID deadNode = getDatanodeID(nodeManager, "DeadNode");
+      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(deadNode);
 
       // Sleep so that heartbeat processing thread gets to run.
       Thread.sleep(500);
@@ -563,12 +563,12 @@ public class TestNodeManager {
        * the 3 second windows.
        */
 
-      nodeManager.updateHeartbeat(healthyNode);
-      nodeManager.updateHeartbeat(staleNode);
-      nodeManager.updateHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(deadNode);
 
       Thread.sleep(1500);
-      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(healthyNode);
       Thread.sleep(2 * 1000);
       assertEquals(1, nodeManager.getNodeCount(HEALTHY));
 
@@ -588,10 +588,10 @@ public class TestNodeManager {
        * staleNode to move to stale state and deadNode to move to dead state.
        */
 
-      nodeManager.updateHeartbeat(healthyNode);
-      nodeManager.updateHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(staleNode);
       Thread.sleep(1500);
-      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(healthyNode);
       Thread.sleep(2 * 1000);
 
       // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -617,14 +617,13 @@ public class TestNodeManager {
       assertEquals("Expected one dead node", 1, deadList.size());
       assertEquals("Dead node is not the expected ID", deadNode
           .getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
-
       /**
        * Cluster State : let us heartbeat all the nodes and verify that we get
        * back all the nodes in healthy state.
        */
-      nodeManager.updateHeartbeat(healthyNode);
-      nodeManager.updateHeartbeat(staleNode);
-      nodeManager.updateHeartbeat(deadNode);
+      nodeManager.sendHeartbeat(healthyNode);
+      nodeManager.sendHeartbeat(staleNode);
+      nodeManager.sendHeartbeat(deadNode);
       Thread.sleep(500);
       //Assert all nodes are healthy.
       assertEquals(3, nodeManager.getAllNodes().size());
@@ -640,10 +639,10 @@ public class TestNodeManager {
    * @param sleepDuration - Duration to sleep between heartbeats.
    * @throws InterruptedException
    */
-  private void heartbeatNodeSet(NodeManager manager, List<DatanodeID> list,
+  private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
                                 int sleepDuration) throws InterruptedException {
     while (!Thread.currentThread().isInterrupted()) {
-      list.forEach(manager::updateHeartbeat);
+      list.forEach(manager::sendHeartbeat);
       Thread.sleep(sleepDuration);
     }
   }
@@ -655,10 +654,12 @@ public class TestNodeManager {
    * @param prefix - A prefix string that can be used in verification.
    * @return List of Nodes.
    */
-  private List<DatanodeID> createNodeSet(int count, String prefix) {
+  private List<DatanodeID> createNodeSet(SCMNodeManager nodeManager, int
+      count, String
+      prefix) {
     List<DatanodeID> list = new LinkedList<>();
     for (int x = 0; x < count; x++) {
-      list.add(getDatanodeID(prefix + x));
+      list.add(getDatanodeID(nodeManager, prefix + x));
     }
     return list;
   }
@@ -696,12 +697,16 @@ public class TestNodeManager {
     conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
     conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
 
-    List<DatanodeID> healthyNodeList = createNodeSet(healthyCount, "Healthy");
-    List<DatanodeID> staleNodeList = createNodeSet(staleCount, "Stale");
-    List<DatanodeID> deadNodeList = createNodeSet(deadCount, "Dead");
 
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeID> healthyNodeList = createNodeSet(nodeManager,
+          healthyCount, "Healthy");
+      List<DatanodeID> staleNodeList = createNodeSet(nodeManager, staleCount,
+          "Stale");
+      List<DatanodeID> deadNodeList = createNodeSet(nodeManager, deadCount,
+          "Dead");
+
       Runnable healthyNodeTask = () -> {
         try {
           // 2 second heartbeat makes these nodes stay healthy.
@@ -722,7 +727,7 @@ public class TestNodeManager {
 
       // No Thread just one time HBs the node manager, so that these will be
       // marked as dead nodes eventually.
-      deadNodeList.forEach(nodeManager::updateHeartbeat);
+      deadNodeList.forEach(nodeManager::sendHeartbeat);
 
       Thread thread1 = new Thread(healthyNodeTask);
       thread1.setDaemon(true);
@@ -745,7 +750,7 @@ public class TestNodeManager {
       List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
 
       for (DatanodeID node : deadList) {
-        assertThat(node.getDatanodeUuid(), CoreMatchers.startsWith("Dead"));
+        assertThat(node.getHostName(), CoreMatchers.startsWith("Dead"));
       }
 
       // Checking stale nodes is tricky since they have to move between
@@ -772,8 +777,6 @@ public class TestNodeManager {
       InterruptedException, TimeoutException {
     final int healthyCount = 3000;
     final int staleCount = 3000;
-    List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
-    List<DatanodeID> staleList = createNodeSet(staleCount, "s");
     Configuration conf = getConf();
     conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
     conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
@@ -781,6 +784,10 @@ public class TestNodeManager {
     conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeID> healthyList = createNodeSet(nodeManager,
+          healthyCount, "h");
+      List<DatanodeID> staleList = createNodeSet(nodeManager, staleCount, "s");
+
       Runnable healthyNodeTask = () -> {
         try {
           heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
@@ -800,7 +807,6 @@ public class TestNodeManager {
       thread1.setDaemon(true);
       thread1.start();
 
-
       Thread thread2 = new Thread(staleNodeTask);
       thread2.setDaemon(true);
       thread2.start();
@@ -829,7 +835,6 @@ public class TestNodeManager {
   public void testScmLogsHeartbeatFlooding() throws IOException,
       InterruptedException {
     final int healthyCount = 3000;
-    List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
 
     // Make the HB process thread run slower.
     Configuration conf = getConf();
@@ -838,6 +843,8 @@ public class TestNodeManager {
     conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeID> healthyList = createNodeSet(nodeManager, healthyCount,
+          "h");
       GenericTestUtils.LogCapturer logCapturer =
           GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
       Runnable healthyNodeTask = () -> {
@@ -871,7 +878,8 @@ public class TestNodeManager {
 
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       nodeManager.setMinimumChillModeNodes(10);
-      nodeManager.updateHeartbeat(getDatanodeID());
+      DatanodeID datanodeID = getDatanodeID(nodeManager);
+      nodeManager.sendHeartbeat(datanodeID);
       String status = nodeManager.getChillModeStatus();
       Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
           "mode. Waiting on nodes to report in."));
@@ -900,7 +908,8 @@ public class TestNodeManager {
 
       // Assert that node manager force enter cannot be overridden by nodes HBs.
       for(int x= 0; x < 20; x++) {
-        nodeManager.updateHeartbeat(getDatanodeID());
+        DatanodeID datanode = getDatanodeID(nodeManager);
+        nodeManager.sendHeartbeat(datanode);
       }
 
       Thread.sleep(500);