浏览代码

HDDS-1713. ReplicationManager fail to find proper node topology based… (#1112)


(cherry picked from commit 69a46a95bb6ab953d4bccdb133565667daca5c96)
Sammi Chen 6 年之前
父节点
当前提交
b685501369
共有 18 个文件被更改,包括 219 次插入182 次删除
  1. 10 5
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
  2. 4 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
  3. 35 86
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
  4. 2 0
      hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
  5. 12 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
  6. 49 32
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
  7. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
  8. 4 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
  9. 1 9
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
  10. 4 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
  11. 13 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
  12. 1 9
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
  13. 2 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
  14. 7 20
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
  15. 6 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
  16. 55 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
  17. 7 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
  18. 6 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

+ 10 - 5
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.protocol;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -193,12 +194,12 @@ public class DatanodeDetails extends NodeImpl implements
       builder.addPort(newPort(
           Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
     }
-    if (datanodeDetailsProto.hasNetworkLocation()) {
-      builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
-    }
     if (datanodeDetailsProto.hasNetworkName()) {
       builder.setNetworkName(datanodeDetailsProto.getNetworkName());
     }
+    if (datanodeDetailsProto.hasNetworkLocation()) {
+      builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
+    }
     return builder.build();
   }
 
@@ -219,8 +220,12 @@ public class DatanodeDetails extends NodeImpl implements
     if (certSerialId != null) {
       builder.setCertSerialId(certSerialId);
     }
-    builder.setNetworkLocation(getNetworkLocation());
-    builder.setNetworkName(getNetworkName());
+    if (!Strings.isNullOrEmpty(getNetworkName())) {
+      builder.setNetworkName(getNetworkName());
+    }
+    if (!Strings.isNullOrEmpty(getNetworkLocation())) {
+      builder.setNetworkLocation(getNetworkLocation());
+    }
 
     for (Port port : ports) {
       builder.addPorts(HddsProtos.Port.newBuilder()

+ 4 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java

@@ -128,6 +128,10 @@ public final class RegisterEndpointTask implements
         datanodeDetails.setHostName(response.getHostname());
         datanodeDetails.setIpAddress(response.getIpAddress());
       }
+      if (response.hasNetworkName() && response.hasNetworkLocation()) {
+        datanodeDetails.setNetworkName(response.getNetworkName());
+        datanodeDetails.setNetworkLocation(response.getNetworkLocation());
+      }
       EndpointStateMachine.EndPointStates nextState =
           rpcEndPoint.getState().getNextState();
       rpcEndPoint.setState(nextState);

+ 35 - 86
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java

@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.ozone.protocol.commands;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -28,23 +29,15 @@ import org.apache.hadoop.hdds.protocol.proto
  * Response to Datanode Register call.
  */
 public class RegisteredCommand {
-  private String datanodeUUID;
   private String clusterID;
   private ErrorCode error;
-  private String hostname;
-  private String ipAddress;
+  private DatanodeDetails datanode;
 
-  public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
+  public RegisteredCommand(final ErrorCode error, final DatanodeDetails node,
       final String clusterID) {
-    this(error, datanodeUUID, clusterID, null, null);
-  }
-  public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
-      final String clusterID, final String hostname, final String ipAddress) {
-    this.datanodeUUID = datanodeUUID;
+    this.datanode = node;
     this.clusterID = clusterID;
     this.error = error;
-    this.hostname = hostname;
-    this.ipAddress = ipAddress;
   }
 
   /**
@@ -57,12 +50,12 @@ public class RegisteredCommand {
   }
 
   /**
-   * Returns datanode UUID.
+   * Returns datanode.
    *
-   * @return - Datanode ID.
+   * @return - Datanode.
    */
-  public String getDatanodeUUID() {
-    return datanodeUUID;
+  public DatanodeDetails getDatanode() {
+    return datanode;
   }
 
   /**
@@ -83,79 +76,54 @@ public class RegisteredCommand {
     return error;
   }
 
-  /**
-   * Returns the hostname.
-   *
-   * @return - hostname
-   */
-  public String getHostName() {
-    return hostname;
-  }
-
-  /**
-   * Returns the ipAddress of the dataNode.
-   */
-  public String getIpAddress() {
-    return ipAddress;
-  }
-
   /**
    * Gets the protobuf message of this object.
    *
    * @return A protobuf message.
    */
-  public byte[] getProtoBufMessage() {
+  public SCMRegisteredResponseProto getProtoBufMessage() {
     SCMRegisteredResponseProto.Builder builder =
         SCMRegisteredResponseProto.newBuilder()
+            // TODO : Fix this later when we have multiple SCM support.
+            // .setAddressList(addressList)
             .setClusterID(this.clusterID)
-            .setDatanodeUUID(this.datanodeUUID)
+            .setDatanodeUUID(this.datanode.getUuidString())
             .setErrorCode(this.error);
-    if (hostname != null && ipAddress != null) {
-      builder.setHostname(hostname).setIpAddress(ipAddress);
+    if (!Strings.isNullOrEmpty(datanode.getHostName())) {
+      builder.setHostname(datanode.getHostName());
+    }
+    if (!Strings.isNullOrEmpty(datanode.getIpAddress())) {
+      builder.setIpAddress(datanode.getIpAddress());
+    }
+    if (!Strings.isNullOrEmpty(datanode.getNetworkName())) {
+      builder.setNetworkName(datanode.getNetworkName());
     }
-    return builder.build().toByteArray();
+    if (!Strings.isNullOrEmpty(datanode.getNetworkLocation())) {
+      builder.setNetworkLocation(datanode.getNetworkLocation());
+    }
+
+    return builder.build();
   }
 
   /**
    * A builder class to verify all values are sane.
    */
   public static class Builder {
-    private String datanodeUUID;
+    private DatanodeDetails datanode;
     private String clusterID;
     private ErrorCode error;
-    private String ipAddress;
-    private String hostname;
 
     /**
-     * sets UUID.
+     * sets datanode details.
      *
-     * @param dnUUID - datanode UUID
+     * @param node - datanode details
      * @return Builder
      */
-    public Builder setDatanodeUUID(String dnUUID) {
-      this.datanodeUUID = dnUUID;
+    public Builder setDatanode(DatanodeDetails node) {
+      this.datanode = node;
       return this;
     }
 
-    /**
-     * Create this object from a Protobuf message.
-     *
-     * @param response - RegisteredCmdResponseProto
-     * @return RegisteredCommand
-     */
-    public  RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto
-                                                        response) {
-      Preconditions.checkNotNull(response);
-      if (response.hasHostname() && response.hasIpAddress()) {
-        return new RegisteredCommand(response.getErrorCode(),
-            response.getDatanodeUUID(), response.getClusterID(),
-            response.getHostname(), response.getIpAddress());
-      } else {
-        return new RegisteredCommand(response.getErrorCode(),
-            response.getDatanodeUUID(), response.getClusterID());
-      }
-    }
-
     /**
      * Sets cluster ID.
      *
@@ -178,38 +146,19 @@ public class RegisteredCommand {
       return this;
     }
 
-    /**
-     * sets the hostname.
-     */
-    public Builder setHostname(String host) {
-      this.hostname = host;
-      return this;
-    }
-
-    public Builder setIpAddress(String ipAddr) {
-      this.ipAddress = ipAddr;
-      return this;
-    }
-
     /**
      * Build the command object.
      *
      * @return RegisteredCommand
      */
     public RegisteredCommand build() {
-      if ((this.error == ErrorCode.success) && (this.datanodeUUID == null
-          || this.datanodeUUID.isEmpty()) || (this.clusterID == null
-          || this.clusterID.isEmpty())) {
+      if ((this.error == ErrorCode.success) && (this.datanode == null
+          || Strings.isNullOrEmpty(this.datanode.getUuidString())
+          || Strings.isNullOrEmpty(this.clusterID))) {
         throw new IllegalArgumentException("On success, RegisteredCommand "
             + "needs datanodeUUID and ClusterID.");
       }
-      if (hostname != null && ipAddress != null) {
-        return new RegisteredCommand(this.error, this.datanodeUUID,
-            this.clusterID, this.hostname, this.ipAddress);
-      } else {
-        return new RegisteredCommand(this.error, this.datanodeUUID,
-            this.clusterID);
-      }
+      return new RegisteredCommand(this.error, this.datanode, this.clusterID);
     }
   }
 }

+ 2 - 0
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -70,6 +70,8 @@ message SCMRegisteredResponseProto {
   optional SCMNodeAddressList addressList = 4;
   optional string hostname = 5;
   optional string ipAddress = 6;
+  optional string networkName = 7;
+  optional string networkLocation = 8;
 }
 
 /**

+ 12 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java

@@ -173,11 +173,19 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   List<SCMCommand> getCommandQueue(UUID dnID);
 
   /**
-   * Given datanode host address, returns the DatanodeDetails for the
-   * node.
+   * Given datanode uuid, returns the DatanodeDetails for the node.
    *
-   * @param address node host address
+   * @param uuid datanode uuid
    * @return the given datanode, or null if not found
    */
-  DatanodeDetails getNode(String address);
+  DatanodeDetails getNodeByUuid(String uuid);
+
+  /**
+   * Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
+   * for the node.
+   *
+   * @param address datanode address
+   * @return the given datanode, or null if not found
+   */
+  DatanodeDetails getNodeByAddress(String address);
 }

+ 49 - 32
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -19,15 +19,13 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.net.InnerNode;
-import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
-import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
@@ -74,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.stream.Collectors;
 
@@ -106,6 +105,8 @@ public class SCMNodeManager implements NodeManager {
   private final NetworkTopology clusterMap;
   private final DNSToSwitchMapping dnsToSwitchMapping;
   private final boolean useHostname;
+  private final ConcurrentHashMap<String, String> dnsToUuidMap =
+      new ConcurrentHashMap<>();
 
   /**
    * Constructs SCM machine Manager.
@@ -252,19 +253,21 @@ public class SCMNodeManager implements NodeManager {
       datanodeDetails.setIpAddress(dnAddress.getHostAddress());
     }
     try {
-      String location;
+      String dnsName;
+      String networkLocation;
+      datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
       if (useHostname) {
-        datanodeDetails.setNetworkName(datanodeDetails.getHostName());
-        location = nodeResolve(datanodeDetails.getHostName());
+        dnsName = datanodeDetails.getHostName();
       } else {
-        datanodeDetails.setNetworkName(datanodeDetails.getIpAddress());
-        location = nodeResolve(datanodeDetails.getIpAddress());
+        dnsName = datanodeDetails.getIpAddress();
       }
-      if (location != null) {
-        datanodeDetails.setNetworkLocation(location);
+      networkLocation = nodeResolve(dnsName);
+      if (networkLocation != null) {
+        datanodeDetails.setNetworkLocation(networkLocation);
       }
       nodeStateManager.addNode(datanodeDetails);
       clusterMap.add(datanodeDetails);
+      dnsToUuidMap.put(dnsName, datanodeDetails.getUuidString());
       // Updating Node Report, as registration is successful
       processNodeReport(datanodeDetails, nodeReport);
       LOG.info("Registered Data node : {}", datanodeDetails);
@@ -274,10 +277,8 @@ public class SCMNodeManager implements NodeManager {
     }
 
     return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
-        .setDatanodeUUID(datanodeDetails.getUuidString())
+        .setDatanode(datanodeDetails)
         .setClusterID(this.scmStorageConfig.getClusterID())
-        .setHostname(datanodeDetails.getHostName())
-        .setIpAddress(datanodeDetails.getIpAddress())
         .build();
   }
 
@@ -553,33 +554,49 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
-   * Given datanode address or host name, returns the DatanodeDetails for the
-   * node.
+   * Given datanode uuid, returns the DatanodeDetails for the node.
    *
-   * @param address node host address
+   * @param uuid node host address
    * @return the given datanode, or null if not found
    */
   @Override
-  public DatanodeDetails getNode(String address) {
-    Node node = null;
-    String location = nodeResolve(address);
-    if (location != null) {
-      node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR +
-          address);
+  public DatanodeDetails getNodeByUuid(String uuid) {
+    if (Strings.isNullOrEmpty(uuid)) {
+      LOG.warn("uuid is null");
+      return null;
+    }
+    DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
+    try {
+      return nodeStateManager.getNode(temp);
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Cannot find node for uuid {}", uuid);
+      return null;
     }
+  }
 
-    if (node != null) {
-      if (node instanceof InnerNode) {
-        LOG.warn("Get node for {} return {}, it's an inner node, " +
-            "not a datanode", address, node.getNetworkFullPath());
-      } else {
-        LOG.debug("Get node for {} return {}", address,
-            node.getNetworkFullPath());
-        return (DatanodeDetails)node;
+  /**
+   * Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
+   * for the node.
+   *
+   * @param address datanode address
+   * @return the given datanode, or null if not found
+   */
+  @Override
+  public DatanodeDetails getNodeByAddress(String address) {
+    if (Strings.isNullOrEmpty(address)) {
+      LOG.warn("address is null");
+      return null;
+    }
+    String uuid = dnsToUuidMap.get(address);
+    if (uuid != null) {
+      DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
+      try {
+        return nodeStateManager.getNode(temp);
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Cannot find node for uuid {}", uuid);
       }
-    } else {
-      LOG.warn("Cannot find node for {}", address);
     }
+    LOG.warn("Cannot find node for address {}", address);
     return null;
   }
 

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java

@@ -57,7 +57,7 @@ public class PipelineActionHandler
           pipelineID = PipelineID.
               getFromProtobuf(action.getClosePipeline().getPipelineID());
           Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
-          LOG.info("Received pipeline action {} for {} from datanode [}",
+          LOG.info("Received pipeline action {} for {} from datanode {}",
               action.getAction(), pipeline, report.getDatanodeDetails());
           pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
         } catch (IOException ioe) {

+ 4 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java

@@ -288,12 +288,12 @@ public class SCMBlockProtocolServer implements
     boolean auditSuccess = true;
     try{
       NodeManager nodeManager = scm.getScmNodeManager();
-      Node client = nodeManager.getNode(clientMachine);
+      Node client = nodeManager.getNodeByAddress(clientMachine);
       List<Node> nodeList = new ArrayList();
-      nodes.stream().forEach(path -> {
-        DatanodeDetails node = nodeManager.getNode(path);
+      nodes.stream().forEach(uuid -> {
+        DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
         if (node != null) {
-          nodeList.add(nodeManager.getNode(path));
+          nodeList.add(node);
         }
       });
       List<? extends Node> sortedNodeList = scm.getClusterMap()

+ 1 - 9
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java

@@ -265,15 +265,7 @@ public class SCMDatanodeProtocolServer implements
   @VisibleForTesting
   public static SCMRegisteredResponseProto getRegisteredResponse(
       RegisteredCommand cmd) {
-    return SCMRegisteredResponseProto.newBuilder()
-        // TODO : Fix this later when we have multiple SCM support.
-        // .setAddressList(addressList)
-        .setErrorCode(cmd.getError())
-        .setClusterID(cmd.getClusterID())
-        .setDatanodeUUID(cmd.getDatanodeUUID())
-        .setIpAddress(cmd.getIpAddress())
-        .setHostname(cmd.getHostName())
-        .build();
+    return cmd.getProtoBufMessage();
   }
 
   @Override

+ 4 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java

@@ -137,8 +137,10 @@ public final class TestUtils {
    */
   public static DatanodeDetails getDatanodeDetails(
       RegisteredCommand registeredCommand) {
-    return createDatanodeDetails(registeredCommand.getDatanodeUUID(),
-        registeredCommand.getHostName(), registeredCommand.getIpAddress(),
+    return createDatanodeDetails(
+        registeredCommand.getDatanode().getUuidString(),
+        registeredCommand.getDatanode().getHostName(),
+        registeredCommand.getDatanode().getIpAddress(),
         null);
   }
 

+ 13 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@@ -87,6 +88,7 @@ public class MockNodeManager implements NodeManager {
   private final Node2PipelineMap node2PipelineMap;
   private final Node2ContainerMap node2ContainerMap;
   private NetworkTopology clusterMap;
+  private ConcurrentHashMap<String, String> dnsToUuidMap;
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
@@ -95,6 +97,7 @@ public class MockNodeManager implements NodeManager {
     this.nodeMetricMap = new HashMap<>();
     this.node2PipelineMap = new Node2PipelineMap();
     this.node2ContainerMap = new Node2ContainerMap();
+    this.dnsToUuidMap = new ConcurrentHashMap();
     aggregateStat = new SCMNodeStat();
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
@@ -370,7 +373,10 @@ public class MockNodeManager implements NodeManager {
     try {
       node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
           Collections.emptySet());
+      dnsToUuidMap.put(datanodeDetails.getIpAddress(),
+          datanodeDetails.getUuidString());
       if (clusterMap != null) {
+        datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
         clusterMap.add(datanodeDetails);
       }
     } catch (SCMException e) {
@@ -459,11 +465,16 @@ public class MockNodeManager implements NodeManager {
   }
 
   @Override
-  public DatanodeDetails getNode(String address) {
-    Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address);
+  public DatanodeDetails getNodeByUuid(String uuid) {
+    Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + uuid);
     return node == null ? null : (DatanodeDetails)node;
   }
 
+  @Override
+  public DatanodeDetails getNodeByAddress(String address) {
+    return getNodeByUuid(dnsToUuidMap.get(address));
+  }
+
   public void setNetworkTopology(NetworkTopology topology) {
     this.clusterMap = topology;
   }

+ 1 - 9
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java

@@ -137,10 +137,6 @@ public class TestSCMContainerPlacementRackAware {
         datanodeDetails.get(2)));
     Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
         datanodeDetails.get(2)));
-    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
-        datanodeDetails.get(3)));
-    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
-        datanodeDetails.get(3)));
   }
 
   @Test
@@ -188,7 +184,7 @@ public class TestSCMContainerPlacementRackAware {
 
     // 5 replicas. there are only 3 racks. policy with fallback should
     // allocate the 5th datanode though it will break the rack rule(first
-    // 2 replicas on same rack, others are different racks).
+    // 2 replicas on same rack, others on different racks).
     int nodeNum = 5;
     List<DatanodeDetails>  datanodeDetails =
         policy.chooseDatanodes(null, null, nodeNum, 15);
@@ -199,10 +195,6 @@ public class TestSCMContainerPlacementRackAware {
         datanodeDetails.get(2)));
     Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
         datanodeDetails.get(2)));
-    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
-        datanodeDetails.get(3)));
-    Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
-        datanodeDetails.get(3)));
   }
 
 

+ 2 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java

@@ -1152,10 +1152,10 @@ public class TestSCMNodeManager {
       // test get node
       if (useHostname) {
         Arrays.stream(hostNames).forEach(hostname ->
-            Assert.assertNotNull(nodeManager.getNode(hostname)));
+            Assert.assertNotNull(nodeManager.getNodeByAddress(hostname)));
       } else {
         Arrays.stream(ipAddress).forEach(ip ->
-            Assert.assertNotNull(nodeManager.getNode(ip)));
+            Assert.assertNotNull(nodeManager.getNodeByAddress(ip)));
       }
     }
   }

+ 7 - 20
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java

@@ -35,6 +35,7 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Test class for @{@link SCMBlockProtocolServer}.
@@ -105,8 +106,8 @@ public class TestSCMBlockProtocolServer {
         node -> System.out.println(node.toString()));
     Assert.assertTrue(datanodeDetails.size() == nodeCount);
 
-    // illegal nodes to sort 1
-    nodes.add("/default-rack");
+    // unknown node to sort
+    nodes.add(UUID.randomUUID().toString());
     ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request =
         ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
             .newBuilder()
@@ -120,25 +121,11 @@ public class TestSCMBlockProtocolServer {
     resp.getNodeList().stream().forEach(
         node -> System.out.println(node.getNetworkName()));
 
-    // illegal nodes to sort 2
-    nodes.remove("/default-rack");
-    nodes.add(nodes.get(0) + "X");
-    request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
-            .newBuilder()
-            .addAllNodeNetworkName(nodes)
-            .setClient(client)
-            .build();
-    resp = service.sortDatanodes(request);
-    Assert.assertTrue(resp.getNodeList().size() == nodeCount);
-    System.out.println("client = " + client);
-    resp.getNodeList().stream().forEach(
-        node -> System.out.println(node.getNetworkName()));
-
-    // all illegal nodes
+    // all unknown nodes
     nodes.clear();
-    nodes.add("/default-rack");
-    nodes.add("/default-rack-1");
-    nodes.add("/default-rack-2");
+    nodes.add(UUID.randomUUID().toString());
+    nodes.add(UUID.randomUUID().toString());
+    nodes.add(UUID.randomUUID().toString());
     request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
         .newBuilder()
         .addAllNodeNetworkName(nodes)

+ 6 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java

@@ -311,7 +311,12 @@ public class ReplicationNodeManagerMock implements NodeManager {
   }
 
   @Override
-  public DatanodeDetails getNode(String address) {
+  public DatanodeDetails getNodeByUuid(String address) {
+    return null;
+  }
+
+  @Override
+  public DatanodeDetails getNodeByAddress(String address) {
     return null;
   }
 }

+ 55 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+    .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.junit.Assert.fail;
@@ -41,7 +43,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
@@ -58,11 +62,14 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -72,6 +79,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.HddsVersionInfo;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -483,6 +491,53 @@ public class TestStorageContainerManager {
     Assert.assertEquals(expectedVersion, actualVersion);
   }
 
+  /**
+   * Test datanode heartbeat well processed with a 4-layer network topology.
+   */
+  @Test(timeout = 60000)
+  public void testScmProcessDatanodeHeartbeat() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String scmId = UUID.randomUUID().toString();
+    conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        StaticMapping.class, DNSToSwitchMapping.class);
+    StaticMapping.addNodeToRack(HddsUtils.getHostName(conf), "/rack1");
+
+    final int datanodeNum = 3;
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(datanodeNum)
+        .setScmId(scmId)
+        .build();
+    cluster.waitForClusterToBeReady();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+
+    try {
+      // first sleep 10s
+      Thread.sleep(10000);
+      // verify datanode heartbeats are well processed
+      long heartbeatCheckerIntervalMs =
+          MiniOzoneCluster.Builder.DEFAULT_HB_INTERVAL_MS;
+      long start = Time.monotonicNow();
+      Thread.sleep(heartbeatCheckerIntervalMs * 2);
+
+      List<DatanodeDetails> allNodes = scm.getScmNodeManager().getAllNodes();
+      Assert.assertTrue(allNodes.size() == datanodeNum);
+      for (int i = 0; i < allNodes.size(); i++) {
+        DatanodeInfo datanodeInfo = (DatanodeInfo) scm.getScmNodeManager()
+            .getNodeByUuid(allNodes.get(i).getUuidString());
+        Assert.assertTrue((datanodeInfo.getLastHeartbeatTime() - start)
+            >= heartbeatCheckerIntervalMs);
+        Assert.assertTrue(datanodeInfo.getUuidString()
+            .equals(datanodeInfo.getNetworkName()));
+        Assert.assertTrue(datanodeInfo.getNetworkLocation()
+            .equals("/rack1"));
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testCloseContainerCommandOnRestart() throws Exception {

+ 7 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -138,7 +138,10 @@ public class TestKeyManagerImpl {
     NodeSchemaManager schemaManager = NodeSchemaManager.getInstance();
     schemaManager.init(schemas, false);
     NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager);
-    nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node));
+    nodeManager.getAllNodes().stream().forEach(node -> {
+      node.setNetworkName(node.getUuidString());
+      clusterMap.add(node);
+    });
     ((MockNodeManager)nodeManager).setNetworkTopology(clusterMap);
     SCMConfigurator configurator = new SCMConfigurator();
     configurator.setScmNodeManager(nodeManager);
@@ -696,17 +699,17 @@ public class TestKeyManagerImpl {
     Assert.assertNotEquals(follower1, follower2);
 
     // lookup key, leader as client
-    OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName());
+    OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getIpAddress());
     Assert.assertEquals(leader, key1.getLatestVersionLocations()
         .getLocationList().get(0).getPipeline().getClosestNode());
 
     // lookup key, follower1 as client
-    OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName());
+    OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getIpAddress());
     Assert.assertEquals(follower1, key2.getLatestVersionLocations()
         .getLocationList().get(0).getPipeline().getClosestNode());
 
     // lookup key, follower2 as client
-    OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName());
+    OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getIpAddress());
     Assert.assertEquals(follower2, key3.getLatestVersionLocations()
         .getLocationList().get(0).getPipeline().getClosestNode());
 

+ 6 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -2123,9 +2123,14 @@ public class KeyManagerImpl implements KeyManager {
       for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
         key.getLocationList().forEach(k -> {
           List<DatanodeDetails> nodes = k.getPipeline().getNodes();
+          if (nodes == null || nodes.size() == 0) {
+            LOG.warn("Datanodes for pipeline {} is empty",
+                k.getPipeline().getId().toString());
+            return;
+          }
           List<String> nodeList = new ArrayList<>();
           nodes.stream().forEach(node ->
-              nodeList.add(node.getNetworkName()));
+              nodeList.add(node.getUuidString()));
           try {
             List<DatanodeDetails> sortedNodes = scmClient.getBlockClient()
                 .sortDatanodes(nodeList, clientMachine);