Explorar o código

HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. Contributed by Nikola Vujic

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1592179 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze %!s(int64=11) %!d(string=hai) anos
pai
achega
b2f65c276d
Modificáronse 19 ficheiros con 644 adicións e 37 borrados
  1. 2 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  2. 56 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java
  3. 16 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java
  4. 178 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java
  5. 86 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java
  6. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  7. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  9. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  10. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
  11. 7 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  12. 35 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
  13. 59 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  14. 38 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
  15. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  16. 32 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  17. 6 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
  18. 88 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
  19. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

+ 2 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -78,6 +78,8 @@ public class CommonConfigurationKeysPublic {
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY =
     "net.topology.table.file.name";
+  public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY = 
+    "net.topology.dependency.script.file.name";
 
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  FS_TRASH_CHECKPOINT_INTERVAL_KEY =

+ 56 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface that must be implemented to allow pluggable
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+public interface DNSToSwitchMappingWithDependency extends DNSToSwitchMapping {
+  /**
+   * Get a list of dependent DNS-names for a given DNS-name/IP-address.
+   * Dependent DNS-names fall into the same fault domain which must be
+   * taken into account when placing replicas. This is intended to be used for
+   * cross node group dependencies when node groups are not sufficient to 
+   * distinguish data nodes by fault domains. In practice, this is needed when
+   * a compute server runs VMs which use shared storage (as opposite to 
+   * directly attached storage). In this case data nodes fall in two different
+   * fault domains. One fault domain is defined by a compute server and 
+   * the other is defined by storage. With node groups we can group data nodes
+   * either by server fault domain or by storage fault domain. However one of
+   * the fault domains cannot be handled and there we need to define cross node
+   * group dependencies. These dependencies are applied in block placement 
+   * polices which ensure that no two replicas will be on two dependent nodes. 
+   * @param name - host name or IP address of a data node. Input host name 
+   * parameter must take a value of dfs.datanode.hostname config value if this
+   * config property is set. Otherwise FQDN of the data node is used.
+   * @return list of dependent host names. If dfs.datanode.hostname config
+   * property is set, then its value must be returned.
+   * Otherwise, FQDN is returned. 
+   */
+  public List<String> getDependency(String name);
+}

+ 16 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java

@@ -45,7 +45,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
+public class ScriptBasedMapping extends CachedDNSToSwitchMapping {
 
   /**
    * Minimum number of arguments: {@value}
@@ -63,6 +63,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
    */
   static final String SCRIPT_FILENAME_KEY = 
                      CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ;
+
   /**
    * key to the argument count that the script supports
    * {@value}
@@ -84,7 +85,15 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
    *
    */
   public ScriptBasedMapping() {
-    super(new RawScriptBasedMapping());
+    this(new RawScriptBasedMapping());
+  }
+
+  /**
+   * Create an instance from the given raw mapping
+   * @param rawMap raw DNSTOSwithMapping
+   */
+  public ScriptBasedMapping(DNSToSwitchMapping rawMap) {
+    super(rawMap);
   }
 
   /**
@@ -132,7 +141,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
    * This is the uncached script mapping that is fed into the cache managed
    * by the superclass {@link CachedDNSToSwitchMapping}
    */
-  private static final class RawScriptBasedMapping
+  protected static class RawScriptBasedMapping
       extends AbstractDNSToSwitchMapping {
     private String scriptName;
     private int maxArgs; //max hostnames per call of the script
@@ -176,7 +185,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
         return m;
       }
 
-      String output = runResolveCommand(names);
+      String output = runResolveCommand(names, scriptName);
       if (output != null) {
         StringTokenizer allSwitchInfo = new StringTokenizer(output);
         while (allSwitchInfo.hasMoreTokens()) {
@@ -208,7 +217,8 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
      * @return null if the number of arguments is out of range,
      * or the output of the command.
      */
-    private String runResolveCommand(List<String> args) {
+    protected String runResolveCommand(List<String> args, 
+        String commandScriptName) {
       int loopCount = 0;
       if (args.size() == 0) {
         return null;
@@ -225,7 +235,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
       while (numProcessed != args.size()) {
         int start = maxArgs * loopCount;
         List<String> cmdList = new ArrayList<String>();
-        cmdList.add(scriptName);
+        cmdList.add(commandScriptName);
         for (numProcessed = start; numProcessed < (start + maxArgs) &&
             numProcessed < args.size(); numProcessed++) {
           cmdList.add(args.get(numProcessed));

+ 178 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+
+
+/**
+ * This class extends ScriptBasedMapping class and implements 
+ * the {@link DNSToSwitchMappingWithDependency} interface using 
+ * a script configured via the 
+ * {@link CommonConfigurationKeys#NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY} option.
+ * <p/>
+ * It contains a static class <code>RawScriptBasedMappingWithDependency</code>
+ * that performs the getDependency work.
+ * <p/>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ScriptBasedMappingWithDependency  extends ScriptBasedMapping 
+    implements DNSToSwitchMappingWithDependency {
+  /**
+   * key to the dependency script filename {@value}
+   */
+  static final String DEPENDENCY_SCRIPT_FILENAME_KEY =
+      CommonConfigurationKeys.NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY;
+
+  private Map<String, List<String>> dependencyCache = 
+      new ConcurrentHashMap<String, List<String>>();
+
+  /**
+   * Create an instance with the default configuration.
+   * </p>
+   * Calling {@link #setConf(Configuration)} will trigger a
+   * re-evaluation of the configuration settings and so be used to
+   * set up the mapping script.
+   */
+  public ScriptBasedMappingWithDependency() {
+    super(new RawScriptBasedMappingWithDependency());
+  }
+
+  /**
+   * Get the cached mapping and convert it to its real type
+   * @return the inner raw script mapping.
+   */
+  private RawScriptBasedMappingWithDependency getRawMapping() {
+    return (RawScriptBasedMappingWithDependency)rawMapping;
+  }
+
+  @Override
+  public String toString() {
+    return "script-based mapping with " + getRawMapping().toString();
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p/>
+   * This will get called in the superclass constructor, so a check is needed
+   * to ensure that the raw mapping is defined before trying to relaying a null
+   * configuration.
+   * @param conf
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    getRawMapping().setConf(conf);
+  }
+
+  /**
+   * Get dependencies in the topology for a given host
+   * @param name - host name for which we are getting dependency
+   * @return a list of hosts dependent on the provided host name
+   */
+  @Override
+  public List<String> getDependency(String name) {
+    //normalize all input names to be in the form of IP addresses
+    name = NetUtils.normalizeHostName(name);
+
+    if (name==null) {
+      return Collections.emptyList();
+    }
+
+    List<String> dependencies = dependencyCache.get(name);
+    if (dependencies == null) {
+      //not cached
+      dependencies = getRawMapping().getDependency(name);
+      if(dependencies != null) {
+        dependencyCache.put(name, dependencies);
+      }
+    }
+
+    return dependencies;
+}
+
+  /**
+   * This is the uncached script mapping that is fed into the cache managed
+   * by the superclass {@link CachedDNSToSwitchMapping}
+   */
+  private static final class RawScriptBasedMappingWithDependency
+      extends ScriptBasedMapping.RawScriptBasedMapping 
+      implements DNSToSwitchMappingWithDependency {
+    private String dependencyScriptName;
+
+    /**
+     * Set the configuration and extract the configuration parameters of interest
+     * @param conf the new configuration
+     */
+    @Override
+    public void setConf (Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        dependencyScriptName = conf.get(DEPENDENCY_SCRIPT_FILENAME_KEY);
+      } else {
+        dependencyScriptName = null;
+      }
+    }
+
+    /**
+     * Constructor. The mapping is not ready to use until
+     * {@link #setConf(Configuration)} has been called
+     */
+    public RawScriptBasedMappingWithDependency() {}
+
+    @Override
+    public List<String> getDependency(String name) {
+      if (name==null || dependencyScriptName==null) {
+        return Collections.emptyList();
+      }
+
+      List <String> m = new LinkedList<String>();
+      List <String> args = new ArrayList<String>(1);
+      args.add(name);
+  
+      String output = runResolveCommand(args,dependencyScriptName);
+      if (output != null) {
+        StringTokenizer allSwitchInfo = new StringTokenizer(output);
+        while (allSwitchInfo.hasMoreTokens()) {
+          String switchInfo = allSwitchInfo.nextToken();
+          m.add(switchInfo);
+        }
+      } else {
+        // an error occurred. return null to signify this.
+        // (exn was already logged in runResolveCommand)
+        return null;
+      }
+
+      return m;
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + ", " + dependencyScriptName != null ?
+          ("dependency script " + dependencyScriptName) : NO_SCRIPT;
+    }
+  }
+}

+ 86 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+import org.junit.Test;
+
+public class TestScriptBasedMappingWithDependency extends TestCase {
+
+  
+  public TestScriptBasedMappingWithDependency() {
+
+  }
+
+  @Test
+  public void testNoArgsMeansNoResult() {
+    Configuration conf = new Configuration();
+    conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY,
+                ScriptBasedMapping.MIN_ALLOWABLE_ARGS - 1);
+    conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename-1");
+    conf.set(ScriptBasedMappingWithDependency.DEPENDENCY_SCRIPT_FILENAME_KEY, 
+        "any-filename-2");
+    conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY, 10);
+
+    ScriptBasedMappingWithDependency mapping = createMapping(conf);
+    List<String> names = new ArrayList<String>();
+    names.add("some.machine.name");
+    names.add("other.machine.name");
+    List<String> result = mapping.resolve(names);
+    assertNull("Expected an empty list for resolve", result);
+    result = mapping.getDependency("some.machine.name");
+    assertNull("Expected an empty list for getDependency", result);
+  }
+
+  @Test
+  public void testNoFilenameMeansSingleSwitch() throws Throwable {
+    Configuration conf = new Configuration();
+    ScriptBasedMapping mapping = createMapping(conf);
+    assertTrue("Expected to be single switch", mapping.isSingleSwitch());
+    assertTrue("Expected to be single switch",
+               AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
+  }
+
+  @Test
+  public void testFilenameMeansMultiSwitch() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename");
+    ScriptBasedMapping mapping = createMapping(conf);
+    assertFalse("Expected to be multi switch", mapping.isSingleSwitch());
+    mapping.setConf(new Configuration());
+    assertTrue("Expected to be single switch", mapping.isSingleSwitch());
+  }
+
+  @Test
+  public void testNullConfig() throws Throwable {
+    ScriptBasedMapping mapping = createMapping(null);
+    assertTrue("Expected to be single switch", mapping.isSingleSwitch());
+  }
+
+  private ScriptBasedMappingWithDependency createMapping(Configuration conf) {
+    ScriptBasedMappingWithDependency mapping = 
+        new ScriptBasedMappingWithDependency();
+    mapping.setConf(conf);
+    return mapping;
+  }
+}

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -267,6 +267,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6281. Provide option to use the NFS Gateway without having to use the
     Hadoop portmapper. (atm)
 
+    HDFS-5168. Add cross node dependency support to BlockPlacementPolicy.
+    (Nikola Vujic via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-6007. Update documentation about short-circuit local reads (iwasakims

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
 import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
 
@@ -50,6 +52,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
   private int xceiverCount;
   private String location = NetworkTopology.DEFAULT_RACK;
   private String softwareVersion;
+  private List<String> dependentHostNames = new LinkedList<String>();
+  
   
   // Datanode administrative states
   public enum AdminStates {
@@ -274,6 +278,21 @@ public class DatanodeInfo extends DatanodeID implements Node {
   public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }
+  
+  /** Add a hostname to a list of network dependencies */
+  public void addDependentHostName(String hostname) {
+    dependentHostNames.add(hostname);
+  }
+  
+  /** List of Network dependencies */
+  public List<String> getDependentHostNames() {
+    return dependentHostNames;
+  }
+  
+  /** Sets the network dependencies */
+  public void setDependentHostNames(List<String> dependencyList) {
+    dependentHostNames = dependencyList;
+  }
     
   /** A formatted string for reporting the status of the DataNode. */
   public String getDatanodeReport() {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -842,7 +842,7 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+    if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof 
         BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -267,7 +267,8 @@ public class BlockManager {
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, stats, datanodeManager.getNetworkTopology());
+        conf, stats, datanodeManager.getNetworkTopology(), 
+        datanodeManager.getHost2DatanodeMap());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -139,7 +139,8 @@ public abstract class BlockPlacementPolicy {
    * @param clusterMap cluster topology
    */
   abstract protected void initialize(Configuration conf,  FSClusterStats stats, 
-                                     NetworkTopology clusterMap);
+                                     NetworkTopology clusterMap, 
+                                     Host2NodesMap host2datanodeMap);
     
   /**
    * Get an instance of the configured Block Placement Policy based on the
@@ -153,14 +154,15 @@ public abstract class BlockPlacementPolicy {
    */
   public static BlockPlacementPolicy getInstance(Configuration conf, 
                                                  FSClusterStats stats,
-                                                 NetworkTopology clusterMap) {
+                                                 NetworkTopology clusterMap,
+                                                 Host2NodesMap host2datanodeMap) {
     final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
         BlockPlacementPolicy.class);
     final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
         replicatorClass, conf);
-    replicator.initialize(conf, stats, clusterMap);
+    replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
     return replicator;
   }
   

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
   protected NetworkTopology clusterMap;
+  protected Host2NodesMap host2datanodeMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
   private long staleInterval;   // interval used to identify stale DataNodes
@@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   protected int tolerateHeartbeatMultiplier;
 
   protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
-                           NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+                           NetworkTopology clusterMap, 
+                           Host2NodesMap host2datanodeMap) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyDefault() {
@@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-                         NetworkTopology clusterMap) {
+                         NetworkTopology clusterMap, 
+                         Host2NodesMap host2datanodeMap) {
     this.considerLoad = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.host2datanodeMap = host2datanodeMap;
     this.heartbeatInterval = conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;

+ 35 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java

@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
 public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
 
   protected BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
-      NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+      NetworkTopology clusterMap, DatanodeManager datanodeManager) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyWithNodeGroup() {
@@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
 
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-          NetworkTopology clusterMap) {
-    super.initialize(conf, stats, clusterMap);
+          NetworkTopology clusterMap, 
+          Host2NodesMap host2datanodeMap) {
+    super.initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   /** choose local node of localMachine as the target.
@@ -241,6 +242,36 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
         countOfExcludedNodes++;
       }
     }
+    
+    countOfExcludedNodes += addDependentNodesToExcludedNodes(
+        chosenNode, excludedNodes);
+    return countOfExcludedNodes;
+  }
+  
+  /**
+   * Add all nodes from a dependent nodes list to excludedNodes.
+   * @return number of new excluded nodes
+   */
+  private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
+      Set<Node> excludedNodes) {
+    if (this.host2datanodeMap == null) {
+      return 0;
+    }
+    int countOfExcludedNodes = 0;
+    for(String hostname : chosenNode.getDependentHostNames()) {
+      DatanodeDescriptor node =
+          this.host2datanodeMap.getDataNodeByHostName(hostname);
+      if(node!=null) {
+        if (excludedNodes.add(node)) {
+          countOfExcludedNodes++;
+        }
+      } else {
+        LOG.warn("Not able to find datanode " + hostname
+            + " which has dependency with datanode "
+            + chosenNode.getHostName());
+      }
+    }
+    
     return countOfExcludedNodes;
   }
 

+ 59 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -373,6 +373,11 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
   }
 
+  /** @return the Host2NodesMap */
+  public Host2NodesMap getHost2DatanodeMap() {
+    return this.host2DatanodeMap;
+  }
+
   /**
    * Given datanode address or host name, returns the DatanodeDescriptor for the
    * same, or if it doesn't find the datanode, it looks for a machine local and
@@ -677,6 +682,52 @@ public class DatanodeManager {
     return networkLocation;
   }
 
+  /**
+   * Resolve a node's dependencies in the network. If the DNS to switch 
+   * mapping fails then this method returns empty list of dependencies 
+   * @param node to get dependencies for
+   * @return List of dependent host names
+   */
+  private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) {
+    List<String> dependencies;
+    try {
+      dependencies = getNetworkDependencies(node);
+    } catch (UnresolvedTopologyException e) {
+      LOG.error("Unresolved dependency mapping for host " + 
+          node.getHostName() +". Continuing with an empty dependency list");
+      dependencies = Collections.emptyList();
+    }
+    return dependencies;
+  }
+  
+  /**
+   * Resolves a node's dependencies in the network. If the DNS to switch 
+   * mapping fails to get dependencies, then this method throws 
+   * UnresolvedTopologyException. 
+   * @param node to get dependencies for
+   * @return List of dependent host names 
+   * @throws UnresolvedTopologyException if the DNS to switch mapping fails
+   */
+  private List<String> getNetworkDependencies(DatanodeInfo node)
+      throws UnresolvedTopologyException {
+    List<String> dependencies = Collections.emptyList();
+
+    if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) {
+      //Get dependencies
+      dependencies = 
+          ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency(
+              node.getHostName());
+      if(dependencies == null) {
+        LOG.error("The dependency call returned null for host " + 
+            node.getHostName());
+        throw new UnresolvedTopologyException("The dependency call returned " + 
+            "null for host " + node.getHostName());
+      }
+    }
+
+    return dependencies;
+  }
+
   /**
    * Remove an already decommissioned data node who is neither in include nor
    * exclude hosts lists from the the list of live or dead nodes.  This is used
@@ -869,12 +920,14 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
 
           // resolve network location
-          if(this.rejectUnresolvedTopologyDN)
-          {
-            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));  
+          if(this.rejectUnresolvedTopologyDN) {
+            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
+            nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
           } else {
             nodeS.setNetworkLocation(
                 resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
+            nodeS.setDependentHostNames(
+                getNetworkDependenciesWithDefault(nodeS));
           }
           getNetworkTopology().add(nodeS);
             
@@ -900,9 +953,12 @@ public class DatanodeManager {
         // resolve network location
         if(this.rejectUnresolvedTopologyDN) {
           nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
         } else {
           nodeDescr.setNetworkLocation(
               resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(
+              getNetworkDependenciesWithDefault(nodeDescr));
         }
         networktopology.add(nodeDescr);
         nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

+ 38 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 class Host2NodesMap {
+  private HashMap<String, String> mapHost = new HashMap<String, String>();
   private final HashMap<String, DatanodeDescriptor[]> map
     = new HashMap<String, DatanodeDescriptor[]>();
   private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
@@ -69,6 +70,10 @@ class Host2NodesMap {
       }
       
       String ipAddr = node.getIpAddr();
+      String hostname = node.getHostName();
+      
+      mapHost.put(hostname, ipAddr);
+      
       DatanodeDescriptor[] nodes = map.get(ipAddr);
       DatanodeDescriptor[] newNodes;
       if (nodes==null) {
@@ -95,6 +100,7 @@ class Host2NodesMap {
     }
       
     String ipAddr = node.getIpAddr();
+    String hostname = node.getHostName();
     hostmapLock.writeLock().lock();
     try {
 
@@ -105,6 +111,8 @@ class Host2NodesMap {
       if (nodes.length==1) {
         if (nodes[0]==node) {
           map.remove(ipAddr);
+          //remove hostname key since last datanode is removed
+          mapHost.remove(hostname);
           return true;
         } else {
           return false;
@@ -188,12 +196,40 @@ class Host2NodesMap {
     }
   }
 
+  
+
+  /** get a data node by its hostname. This should be used if only one 
+   * datanode service is running on a hostname. If multiple datanodes
+   * are running on a hostname then use methods getDataNodeByXferAddr and
+   * getDataNodeByHostNameAndPort.
+   * @return DatanodeDescriptor if found; otherwise null.
+   */
+  DatanodeDescriptor getDataNodeByHostName(String hostname) {
+    if(hostname == null) {
+      return null;
+    }
+    
+    hostmapLock.readLock().lock();
+    try {
+      String ipAddr = mapHost.get(hostname);
+      if(ipAddr == null) {
+        return null;
+      } else {  
+        return getDatanodeByHost(ipAddr);
+      }
+    } finally {
+      hostmapLock.readLock().unlock();
+    }
+  }
+
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
         .append("[");
-    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
-      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    for(Map.Entry<String, String> host: mapHost.entrySet()) {
+      DatanodeDescriptor[] e = map.get(host.getValue());
+      b.append("\n  " + host.getKey() + " => "+host.getValue() + " => " 
+          + Arrays.asList(e));
     }
     return b.append("\n]").toString();
   }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -172,8 +172,10 @@ public class NamenodeFsck {
     this.minReplication = minReplication;
     this.remoteAddress = remoteAddress;
     this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
-        networktopology);
-
+        networktopology,
+        namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .getHost2DatanodeMap());
+    
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       if (key.equals("path")) { this.path = pmap.get("path")[0]; }

+ 32 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -897,29 +897,47 @@ public class DFSTestUtil {
     return getDatanodeDescriptor(ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
         rackLocation);
   }
+  
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, String hostname) {
+    return getDatanodeDescriptor(ipAddr, 
+        DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
+  }
 
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip) {
-    return createDatanodeStorageInfo(storageID, ip, "defaultRack");
+    return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host");
   }
+  
   public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
-    return createDatanodeStorageInfos(racks.length, racks);
+    return createDatanodeStorageInfos(racks, null);
+  }
+  
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) {
+    return createDatanodeStorageInfos(racks.length, racks, hostnames);
+  }
+  
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
+    return createDatanodeStorageInfos(n, null, null);
   }
-  public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) {
+    
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(
+      int n, String[] racks, String[] hostnames) {
     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
     for(int i = storages.length; i > 0; ) {
       final String storageID = "s" + i;
       final String ip = i + "." + i + "." + i + "." + i;
       i--;
-      final String rack = i < racks.length? racks[i]: "defaultRack";
-      storages[i] = createDatanodeStorageInfo(storageID, ip, rack);
+      final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack";
+      final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host";
+      storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname);
     }
     return storages;
   }
   public static DatanodeStorageInfo createDatanodeStorageInfo(
-      String storageID, String ip, String rack) {
+      String storageID, String ip, String rack, String hostname) {
     final DatanodeStorage storage = new DatanodeStorage(storageID);
-    final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage);
+    final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname);
     return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
   }
   public static DatanodeDescriptor[] toDatanodeDescriptor(
@@ -932,8 +950,8 @@ public class DFSTestUtil {
   }
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
-      int port, String rackLocation) {
-    DatanodeID dnId = new DatanodeID(ipAddr, "host",
+      int port, String rackLocation, String hostname) {
+    DatanodeID dnId = new DatanodeID(ipAddr, hostname,
         UUID.randomUUID().toString(), port,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
@@ -941,6 +959,11 @@ public class DFSTestUtil {
     return new DatanodeDescriptor(dnId, rackLocation);
   }
   
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      int port, String rackLocation) {
+    return getDatanodeDescriptor(ipAddr, port, rackLocation, "host");
+  }
+  
   public static DatanodeRegistration getLocalDatanodeRegistration() {
     return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo(
         NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion());

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

@@ -236,8 +236,13 @@ public class BlockManagerTestUtil {
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
       String rackLocation, DatanodeStorage storage) {
+    return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host");
+  }
+
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, DatanodeStorage storage, String hostname) {
       DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
-          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
+          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
       if (storage != null) {
         dn.updateStorage(storage);
       }

+ 88 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java

@@ -47,11 +47,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 public class TestReplicationPolicyWithNodeGroup {
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 8;
   private static final int NUM_OF_DATANODES_BOUNDARY = 6;
   private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
+  private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
   private final Configuration CONF = new HdfsConfiguration();
   private NetworkTopology cluster;
   private NameNode namenode;
@@ -113,7 +115,33 @@ public class TestReplicationPolicyWithNodeGroup {
 
   private final static DatanodeDescriptor NODE = 
       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
-
+  
+  private static final DatanodeStorageInfo[] storagesForDependencies;
+  private static final DatanodeDescriptor[]  dataNodesForDependencies;
+  static {
+    final String[] racksForDependencies = {
+        "/d1/r1/n1",
+        "/d1/r1/n1",
+        "/d1/r1/n2",
+        "/d1/r1/n2",
+        "/d1/r1/n3",
+        "/d1/r1/n4"
+    };
+    final String[] hostNamesForDependencies = {
+        "h1",
+        "h2",
+        "h3",
+        "h4",
+        "h5",
+        "h6"
+    };
+    
+    storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos(
+        racksForDependencies, hostNamesForDependencies);
+    dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
+    
+  };
+  
   @Before
   public void setUp() throws Exception {
     FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
@@ -720,5 +748,63 @@ public class TestReplicationPolicyWithNodeGroup {
     assertEquals(targets.length, 6);
   }
 
-
+  @Test
+  public void testChooseTargetWithDependencies() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.remove(dataNodes[i]);
+    }
+    
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
+      if (cluster.contains(node)) {
+        cluster.remove(node);
+      }
+    }
+    
+    Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
+        .getBlockManager()
+        .getDatanodeManager().getHost2DatanodeMap();
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      cluster.add(dataNodesForDependencies[i]);
+      host2DatanodeMap.add(dataNodesForDependencies[i]);
+    }
+    
+    //add dependencies (node1 <-> node2, and node3<->node4)
+    dataNodesForDependencies[1].addDependentHostName(
+        dataNodesForDependencies[2].getHostName());
+    dataNodesForDependencies[2].addDependentHostName(
+        dataNodesForDependencies[1].getHostName());
+    dataNodesForDependencies[3].addDependentHostName(
+        dataNodesForDependencies[4].getHostName());
+    dataNodesForDependencies[4].addDependentHostName(
+        dataNodesForDependencies[3].getHostName());
+    
+    //Update heartbeat
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      updateHeartbeatWithUsage(dataNodesForDependencies[i],
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
+    }
+    
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    
+    DatanodeStorageInfo[] targets;
+    Set<Node> excludedNodes = new HashSet<Node>();
+    excludedNodes.add(dataNodesForDependencies[5]);
+    
+    //try to select three targets as there are three node groups
+    targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes);
+    
+    //Even there are three node groups, verify that 
+    //only two targets are selected due to dependencies
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], storagesForDependencies[1]);
+    assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
+    
+    //verify that all data nodes are in the excluded list
+    assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
+    }
+  }
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -68,6 +68,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSck;
@@ -981,10 +983,15 @@ public class TestFsck {
     PrintWriter out = new PrintWriter(result, true);
     InetAddress remoteAddress = InetAddress.getLocalHost();
     FSNamesystem fsName = mock(FSNamesystem.class);
+    BlockManager blockManager = mock(BlockManager.class);
+    DatanodeManager dnManager = mock(DatanodeManager.class);
+    
     when(namenode.getNamesystem()).thenReturn(fsName);
     when(fsName.getBlockLocations(anyString(), anyLong(), anyLong(),
         anyBoolean(), anyBoolean(), anyBoolean())).
         thenThrow(new FileNotFoundException()) ;
+    when(fsName.getBlockManager()).thenReturn(blockManager);
+    when(blockManager.getDatanodeManager()).thenReturn(dnManager);
 
     NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
         NUM_REPLICAS, (short)1, remoteAddress);