Browse Source

HDFS-11450. HDFS specific network topology classes with storage type info included. Contributed by Chen Liang.

Arpit Agarwal 7 năm trước cách đây
mục cha
commit
d8d2a221f8

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNode.java

@@ -61,7 +61,7 @@ public interface InnerNode extends Node {
    *
    * @param leafIndex an indexed leaf of the node
    * @param excludedNode an excluded node (can be null)
-   * @return
+   * @return the leaf node corresponding to the given index.
    */
   Node getLeaf(int leafIndex, Node excludedNode);
 }

+ 11 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java

@@ -25,9 +25,9 @@ import java.util.Map;
 /** InnerNode represents a switch/router of a data center or rack.
  * Different from a leaf node, it has non-null children.
  */
-class InnerNodeImpl extends NodeBase implements InnerNode {
-  static class Factory implements InnerNode.Factory<InnerNodeImpl> {
-    private Factory() {}
+public class InnerNodeImpl extends NodeBase implements InnerNode {
+  protected static class Factory implements InnerNode.Factory<InnerNodeImpl> {
+    protected Factory() {}
 
     @Override
     public InnerNodeImpl newInnerNode(String path) {
@@ -37,18 +37,18 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
 
   static final Factory FACTORY = new Factory();
 
-  private final List<Node> children = new ArrayList<>();
-  private final Map<String, Node> childrenMap = new HashMap<>();
-  private int numOfLeaves;
+  protected final List<Node> children = new ArrayList<>();
+  protected final Map<String, Node> childrenMap = new HashMap<>();
+  protected int numOfLeaves;
 
   /** Construct an InnerNode from a path-like string */
-  InnerNodeImpl(String path) {
+  protected InnerNodeImpl(String path) {
     super(path);
   }
 
   /** Construct an InnerNode
    * from its name, its network location, its parent, and its level */
-  InnerNodeImpl(String name, String location, InnerNode parent, int level) {
+  protected InnerNodeImpl(String name, String location, InnerNode parent, int level) {
     super(name, location, parent, level);
   }
 
@@ -81,7 +81,7 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
    * @param n a node
    * @return true if this node is an ancestor of <i>n</i>
    */
-  boolean isAncestor(Node n) {
+  protected boolean isAncestor(Node n) {
     return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
       (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
       startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
@@ -92,12 +92,12 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
    * @param n a node
    * @return true if this node is the parent of <i>n</i>
    */
-  boolean isParent(Node n) {
+  protected boolean isParent(Node n) {
     return n.getNetworkLocation().equals(getPath(this));
   }
 
   /* Return a child name of this node who is an ancestor of node <i>n</i> */
-  private String getNextAncestorName(Node n) {
+  protected String getNextAncestorName(Node n) {
     if (!isAncestor(n)) {
       throw new IllegalArgumentException(
                                          this + "is not an ancestor of " + n);

+ 23 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -67,16 +67,32 @@ public class NetworkTopology {
    * @return an instance of NetworkTopology
    */
   public static NetworkTopology getInstance(Configuration conf){
-    return ReflectionUtils.newInstance(
+    return getInstance(conf, InnerNodeImpl.FACTORY);
+  }
+
+  public static NetworkTopology getInstance(Configuration conf,
+      InnerNode.Factory factory) {
+    NetworkTopology nt = ReflectionUtils.newInstance(
         conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
-        NetworkTopology.class, NetworkTopology.class), conf);
+            NetworkTopology.class, NetworkTopology.class), conf);
+    return nt.init(factory);
+  }
+
+  protected NetworkTopology init(InnerNode.Factory factory) {
+    if (!factory.equals(this.factory)) {
+      // the constructor has initialized the factory to default. So only init
+      // again if another factory is specified.
+      this.factory = factory;
+      this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
+    }
+    return this;
   }
 
-  InnerNode.Factory factory = InnerNodeImpl.FACTORY;
+  InnerNode.Factory factory;
   /**
    * the root cluster map
    */
-  InnerNode clusterMap = factory.newInnerNode(NodeBase.ROOT);
+  InnerNode clusterMap;
   /** Depth of all leaf nodes */
   private int depthOfAllLeaves = -1;
   /** rack counter */
@@ -91,7 +107,10 @@ public class NetworkTopology {
   /** the lock used to manage access */
   protected ReadWriteLock netlock = new ReentrantReadWriteLock();
 
+  // keeping the constructor because other components like MR still uses this.
   public NetworkTopology() {
+    this.factory = InnerNodeImpl.FACTORY;
+    this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
   }
 
   /** Add a leaf node

+ 3 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java

@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.math3.stat.inference.ChiSquareTest;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -80,7 +81,7 @@ public class TestClusterTopology extends Assert {
   @Test
   public void testCountNumNodes() throws Exception {
     // create the topology
-    NetworkTopology cluster = new NetworkTopology();
+    NetworkTopology cluster = NetworkTopology.getInstance(new Configuration());
     NodeElement node1 = getNewNode("node1", "/d1/r1");
     cluster.add(node1);
     NodeElement node2 = getNewNode("node2", "/d1/r2");
@@ -128,7 +129,7 @@ public class TestClusterTopology extends Assert {
   @Test
   public void testChooseRandom() {
     // create the topology
-    NetworkTopology cluster = new NetworkTopology();
+    NetworkTopology cluster = NetworkTopology.getInstance(new Configuration());
     NodeElement node1 = getNewNode("node1", "/d1/r1");
     cluster.add(node1);
     NodeElement node2 = getNewNode("node2", "/d1/r2");

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -294,6 +295,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  public EnumSet<StorageType> getStorageTypes() {
+    EnumSet<StorageType> storageTypes = EnumSet.noneOf(StorageType.class);
+    for (DatanodeStorageInfo dsi : getStorageInfos()) {
+      storageTypes.add(dsi.getStorageType());
+    }
+    return storageTypes;
+  }
+
   public StorageReport[] getStorageReports() {
     final DatanodeStorageInfo[] infos = getStorageInfos();
     final StorageReport[] reports = new StorageReport[infos.length];

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+
+/**
+ * The HDFS specific network topology class. The main purpose of doing this
+ * subclassing is to add storage-type-aware chooseRandom method. All the
+ * remaining parts should be the same.
+ *
+ * Currently a placeholder to test storage type info.
+ * TODO : add "chooseRandom with storageType info" function.
+ */
+public class DFSNetworkTopology extends NetworkTopology {
+  public static DFSNetworkTopology getInstance(Configuration conf) {
+    DFSNetworkTopology nt = new DFSNetworkTopology();
+    return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY);
+  }
+}

+ 255 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java

@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.net.InnerNode;
+import org.apache.hadoop.net.InnerNodeImpl;
+import org.apache.hadoop.net.Node;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+
+/**
+ * The HDFS-specific representation of a network topology inner node. The
+ * difference is this class includes the information about the storage type
+ * info of this subtree. This info will be used when selecting subtrees
+ * in block placement.
+ */
+public class DFSTopologyNodeImpl extends InnerNodeImpl {
+
+  static final InnerNodeImpl.Factory FACTORY
+      = new DFSTopologyNodeImpl.Factory();
+
+  static final class Factory extends InnerNodeImpl.Factory {
+    private Factory() {}
+
+    @Override
+    public InnerNodeImpl newInnerNode(String path) {
+      return new DFSTopologyNodeImpl(path);
+    }
+  }
+
+  /**
+   * The core data structure of this class. The information about what storage
+   * types this subtree has. Basically, a map whose key is a child
+   * id, value is a enum map including the counts of each storage type. e.g.
+   * DISK type has count 5 means there are 5 leaf datanodes with DISK type
+   * available. This value is set/updated upon datanode joining and leaving.
+   *
+   * NOTE : It might be sufficient to keep only a map from storage type
+   * to count, omitting the child node id. But this might make it hard to keep
+   * consistency when there are updates from children.
+   *
+   * For example, if currently R has two children A and B with storage X, Y, and
+   * A : X=1 Y=1
+   * B : X=2 Y=2
+   * so we store X=3 Y=3 as total on R.
+   *
+   * Now say A has a new X plugged in and becomes X=2 Y=1.
+   *
+   * If we know that "A adds one X", it is easy to update R by +1 on X. However,
+   * if we don't know "A adds one X", but instead got "A now has X=2 Y=1",
+   * (which seems to be the case in current heartbeat) we will not know how to
+   * update R. While if we store on R "A has X=1 and Y=1" then we can simply
+   * update R by completely replacing the A entry and all will be good.
+   */
+  private final HashMap
+      <String, EnumMap<StorageType, Integer>> childrenStorageInfo;
+
+  DFSTopologyNodeImpl(String path) {
+    super(path);
+    childrenStorageInfo = new HashMap<>();
+  }
+
+  DFSTopologyNodeImpl(
+      String name, String location, InnerNode parent, int level) {
+    super(name, location, parent, level);
+    childrenStorageInfo = new HashMap<>();
+  }
+
+  int getNumOfChildren() {
+    return children.size();
+  }
+
+  @Override
+  public boolean add(Node n) {
+    if (!isAncestor(n)) {
+      throw new IllegalArgumentException(n.getName()
+          + ", which is located at " + n.getNetworkLocation()
+          + ", is not a descendant of " + getPath(this));
+    }
+    // In HDFS topology, the leaf node should always be DatanodeDescriptor
+    if (!(n instanceof DatanodeDescriptor)) {
+      throw new IllegalArgumentException("Unexpected node type "
+          + n.getClass().getName());
+    }
+    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
+    if (isParent(n)) {
+      // this node is the parent of n; add n directly
+      n.setParent(this);
+      n.setLevel(this.level + 1);
+      Node prev = childrenMap.put(n.getName(), n);
+      if (prev != null) {
+        for(int i=0; i<children.size(); i++) {
+          if (children.get(i).getName().equals(n.getName())) {
+            children.set(i, n);
+            return false;
+          }
+        }
+      }
+      children.add(n);
+      numOfLeaves++;
+      synchronized (childrenStorageInfo) {
+        if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) {
+          childrenStorageInfo.put(
+              dnDescriptor.getName(),
+              new EnumMap<StorageType, Integer>(StorageType.class));
+        }
+        for (StorageType st : dnDescriptor.getStorageTypes()) {
+          childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1);
+        }
+      }
+      return true;
+    } else {
+      // find the next ancestor node
+      String parentName = getNextAncestorName(n);
+      InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
+      if (parentNode == null) {
+        // create a new InnerNode
+        parentNode = createParentNode(parentName);
+        children.add(parentNode);
+        childrenMap.put(parentNode.getName(), parentNode);
+      }
+      // add n to the subtree of the next ancestor node
+      if (parentNode.add(n)) {
+        numOfLeaves++;
+        synchronized (childrenStorageInfo) {
+          if (!childrenStorageInfo.containsKey(parentNode.getName())) {
+            childrenStorageInfo.put(
+                parentNode.getName(),
+                new EnumMap<StorageType, Integer>(StorageType.class));
+            for (StorageType st : dnDescriptor.getStorageTypes()) {
+              childrenStorageInfo.get(parentNode.getName()).put(st, 1);
+            }
+          } else {
+            EnumMap<StorageType, Integer> currentCount =
+                childrenStorageInfo.get(parentNode.getName());
+            for (StorageType st : dnDescriptor.getStorageTypes()) {
+              if (currentCount.containsKey(st)) {
+                currentCount.put(st, currentCount.get(st) + 1);
+              } else {
+                currentCount.put(st, 1);
+              }
+            }
+          }
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() {
+    return childrenStorageInfo;
+  }
+
+
+  private DFSTopologyNodeImpl createParentNode(String parentName) {
+    return new DFSTopologyNodeImpl(
+        parentName, getPath(this), this, this.getLevel() + 1);
+  }
+
+  @Override
+  public boolean remove(Node n) {
+    if (!isAncestor(n)) {
+      throw new IllegalArgumentException(n.getName()
+          + ", which is located at " + n.getNetworkLocation()
+          + ", is not a descendant of " + getPath(this));
+    }
+    // In HDFS topology, the leaf node should always be DatanodeDescriptor
+    if (!(n instanceof DatanodeDescriptor)) {
+      throw new IllegalArgumentException("Unexpected node type "
+          + n.getClass().getName());
+    }
+    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
+    if (isParent(n)) {
+      // this node is the parent of n; remove n directly
+      if (childrenMap.containsKey(n.getName())) {
+        for (int i=0; i<children.size(); i++) {
+          if (children.get(i).getName().equals(n.getName())) {
+            children.remove(i);
+            childrenMap.remove(n.getName());
+            synchronized (childrenStorageInfo) {
+              childrenStorageInfo.remove(dnDescriptor.getName());
+            }
+            numOfLeaves--;
+            n.setParent(null);
+            return true;
+          }
+        }
+      }
+      return false;
+    } else {
+      // find the next ancestor node: the parent node
+      String parentName = getNextAncestorName(n);
+      DFSTopologyNodeImpl parentNode =
+          (DFSTopologyNodeImpl)childrenMap.get(parentName);
+      if (parentNode == null) {
+        return false;
+      }
+      // remove n from the parent node
+      boolean isRemoved = parentNode.remove(n);
+      if (isRemoved) {
+        // if the parent node has no children, remove the parent node too
+        synchronized (childrenStorageInfo) {
+          EnumMap<StorageType, Integer> currentCount =
+              childrenStorageInfo.get(parentNode.getName());
+          EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class);
+          for (StorageType st : dnDescriptor.getStorageTypes()) {
+            int newCount = currentCount.get(st) - 1;
+            if (newCount == 0) {
+              toRemove.add(st);
+            }
+            currentCount.put(st, newCount);
+          }
+          for (StorageType st : toRemove) {
+            currentCount.remove(st);
+          }
+        }
+        if (parentNode.getNumOfChildren() == 0) {
+          for(int i=0; i < children.size(); i++) {
+            if (children.get(i).getName().equals(parentName)) {
+              children.remove(i);
+              childrenMap.remove(parentName);
+              childrenStorageInfo.remove(parentNode.getName());
+              break;
+            }
+          }
+        }
+        numOfLeaves--;
+      }
+      return isRemoved;
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java

@@ -1292,7 +1292,7 @@ public class TestBlockStoragePolicy {
     }
 
     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+        conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
     conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
         new File(baseDir, "name").getPath());

+ 260 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java

@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the correctness of storage type info stored in
+ * DFSNetworkTopology.
+ */
+public class TestDFSNetworkTopology {
+  private static final Log LOG =
+      LogFactory.getLog(TestDFSNetworkTopology.class);
+  private final static DFSNetworkTopology CLUSTER =
+      DFSNetworkTopology.getInstance(new Configuration());
+  private DatanodeDescriptor[] dataNodes;
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30000);
+
+  @Before
+  public void setupDatanodes() {
+    final String[] racks = {
+        "/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2",
+
+        "/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3",
+
+        "/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5",
+
+        "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1",
+        "/l2/d4/r1", "/l2/d4/r1"};
+    final String[] hosts = {
+        "host1", "host2", "host3", "host4", "host5",
+        "host6", "host7", "host8", "host9", "host10",
+        "host11", "host12", "host13", "host14", "host15",
+        "host16", "host17", "host18", "host19", "host20"};
+    final StorageType[] types = {
+        StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE,
+        StorageType.DISK, StorageType.DISK,
+
+        StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD,
+
+        StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK,
+        StorageType.ARCHIVE, StorageType.ARCHIVE,
+
+        StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK,
+        StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE,
+        StorageType.SSD};
+    final DatanodeStorageInfo[] storages =
+        DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+    for (int i = 0; i < dataNodes.length; i++) {
+      CLUSTER.add(dataNodes[i]);
+    }
+    dataNodes[9].setDecommissioned();
+    dataNodes[10].setDecommissioned();
+  }
+
+  /**
+   * Test getting the storage type info of subtree.
+   * @throws Exception
+   */
+  @Test
+  public void testGetStorageTypeInfo() throws Exception {
+    // checking level = 2 nodes
+    DFSTopologyNodeImpl d1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> d1info =
+        d1.getChildrenStorageInfo();
+    assertEquals(2, d1info.keySet().size());
+    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2);
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
+
+    DFSTopologyNodeImpl d2 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2");
+    HashMap<String, EnumMap<StorageType, Integer>> d2info =
+        d2.getChildrenStorageInfo();
+    assertEquals(1, d2info.keySet().size());
+    assertTrue(d2info.get("r3").size() == 3);
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK));
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl d3 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3");
+    HashMap<String, EnumMap<StorageType, Integer>> d3info =
+        d3.getChildrenStorageInfo();
+    assertEquals(5, d3info.keySet().size());
+    assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK));
+    assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE));
+
+    DFSTopologyNodeImpl d4 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4");
+    HashMap<String, EnumMap<StorageType, Integer>> d4info =
+        d4.getChildrenStorageInfo();
+    assertEquals(1, d4info.keySet().size());
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK));
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK));
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl l1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> l1info =
+        l1.getChildrenStorageInfo();
+    assertEquals(2, l1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 2
+        && l1info.get("d2").size() == 3);
+    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+
+    // checking level = 1 nodes
+    DFSTopologyNodeImpl l2 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2");
+    HashMap<String, EnumMap<StorageType, Integer>> l2info =
+        l2.getChildrenStorageInfo();
+    assertTrue(l2info.get("d3").size() == 3
+        && l2info.get("d4").size() == 4);
+    assertEquals(2, l2info.keySet().size());
+    assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK));
+    assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD));
+  }
+
+  /**
+   * Test the correctness of storage type info when nodes are added and removed.
+   * @throws Exception
+   */
+  @Test
+  public void testAddAndRemoveTopology() throws Exception {
+    String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"};
+    String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"};
+    String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32",
+        "33.33.33.33"};
+    StorageType[] newTypes = {StorageType.DISK, StorageType.SSD,
+        StorageType.SSD, StorageType.SSD};
+    DatanodeDescriptor[] newDD = new DatanodeDescriptor[4];
+
+    for (int i = 0; i<4; i++) {
+      DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo(
+          "s" + newHost[i], newips[i], newRack[i], newHost[i],
+          newTypes[i], null);
+      newDD[i] = dsi.getDatanodeDescriptor();
+      CLUSTER.add(newDD[i]);
+    }
+
+    DFSTopologyNodeImpl d1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> d1info =
+        d1.getChildrenStorageInfo();
+    assertEquals(3, d1info.keySet().size());
+    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2
+      && d1info.get("r3").size() == 1);
+    assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl d3 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3");
+    HashMap<String, EnumMap<StorageType, Integer>> d3info =
+        d3.getChildrenStorageInfo();
+    assertEquals(1, d3info.keySet().size());
+    assertTrue(d3info.get("r3").size() == 1);
+    assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl l1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> l1info =
+        l1.getChildrenStorageInfo();
+    assertEquals(3, l1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 3 &&
+        l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1);
+    assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK));
+    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD));
+
+
+    for (int i = 0; i<4; i++) {
+      CLUSTER.remove(newDD[i]);
+    }
+
+    // /d1/r3 should've been out, /d1/r1 should've been resumed
+    DFSTopologyNodeImpl nd1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> nd1info =
+        nd1.getChildrenStorageInfo();
+    assertEquals(2, nd1info.keySet().size());
+    assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2);
+    assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE));
+
+    // /l1/d3 should've been out, and /l1/d1 should've been resumed
+    DFSTopologyNodeImpl nl1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> nl1info =
+        nl1.getChildrenStorageInfo();
+    assertEquals(2, nl1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 2
+        && l1info.get("d2").size() == 3);
+    assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+
+    assertNull(CLUSTER.getNode("/l1/d3"));
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

@@ -47,7 +47,8 @@ import org.junit.rules.Timeout;
 
 public class TestNetworkTopology {
   private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
-  private final static NetworkTopology cluster = new NetworkTopology();
+  private final static NetworkTopology cluster =
+      NetworkTopology.getInstance(new Configuration());
   private DatanodeDescriptor dataNodes[];
 
   @Rule
@@ -101,7 +102,8 @@ public class TestNetworkTopology {
 
   @Test
   public void testCreateInvalidTopology() throws Exception {
-    NetworkTopology invalCluster = new NetworkTopology();
+    NetworkTopology invalCluster =
+        NetworkTopology.getInstance(new Configuration());
     DatanodeDescriptor invalDataNodes[] = new DatanodeDescriptor[] {
         DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
         DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),