Prechádzať zdrojové kódy

HDFS-4350. Make enabling of stale marking on read and write paths independent. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1441823 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 rokov pred
rodič
commit
0ac72082f4

+ 3 - 0
CHANGES.txt

@@ -4,6 +4,9 @@ Release 1.2.0 - unreleased
 
   INCOMPATIBLE CHANGES
 
+    HDFS-4350. Make enabling of stale marking on read and write paths
+    independent. (Andrew Wang via suresh)
+
   NEW FEATURES
 
     MAPREDUCE-4660. Update task placement policy for network topology

+ 10 - 13
src/hdfs/hdfs-default.xml

@@ -642,17 +642,14 @@ creations/deletions), or "all".</description>
 </property>
 
 <property>
-  <name>dfs.namenode.check.stale.datanode</name>
+  <name>dfs.namenode.avoid.read.stale.datanode</name>
   <value>false</value>
   <description>
-    Indicate whether or not to check "stale" datanodes whose 
+    Indicate whether or not to avoid reading from &quot;stale&quot; datanodes whose
     heartbeat messages have not been received by the namenode 
-    for more than a specified time interval. If this configuration 
-    parameter is set as true, the system will keep track 
-    of the number of stale datanodes. The stale datanodes will be 
+    for more than a specified time interval. Stale datanodes will be
     moved to the end of the node list returned for reading. See
-    dfs.namenode.avoid.write.stale.datanode for details on how this 
-    affects writes.
+    dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
   </description>
 </property>
 
@@ -660,13 +657,13 @@ creations/deletions), or "all".</description>
   <name>dfs.namenode.avoid.write.stale.datanode</name>
   <value>false</value>
   <description>
-    Indicate whether or not to avoid writing to "stale" datanodes whose 
+    Indicate whether or not to avoid writing to &quot;stale&quot; datanodes whose 
     heartbeat messages have not been received by the namenode 
-    for more than a specified time interval. If this configuration 
-    parameter and dfs.namenode.check.stale.datanode are both set as true, 
-    the writing will avoid using stale datanodes unless a high number 
-    of datanodes are marked as stale. See 
-    dfs.namenode.write.stale.datanode.ratio for details.
+    for more than a specified time interval. Writes will avoid using 
+    stale datanodes unless more than a configured ratio 
+    (dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as 
+    stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
+    for reads.
   </description>
 </property>
 

+ 6 - 6
src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -150,10 +150,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
   public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
   
-  // Whether to enable datanode's stale state detection and usage
-  public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
-  public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
-  // Whether to enable datanode's stale state detection and usage
+  // Whether to enable datanode's stale state detection and usage for reads
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage for writes
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
   public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
   // The default value of the time interval for marking datanodes as stale
@@ -163,8 +163,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
   public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
   public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
-  // When the number stale datanodes marked as stale reached this certian ratio, 
-  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  // When the percentage of stale datanodes reaches this ratio,
+  // allow writing to stale nodes to prevent hotspots.
   public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
   public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java

@@ -180,7 +180,7 @@ public class DFSUtil {
      * Constructor of StaleComparator
      * 
      * @param interval
-     *          The time invertal for marking datanodes as stale is passed from
+     *          The time interval for marking datanodes as stale is passed from
      *          outside, since the interval may be changed dynamically
      */
     public StaleComparator(long interval) {

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

@@ -141,7 +141,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
       
     boolean avoidStaleNodes = (stats != null && stats
-        .isAvoidingStaleDataNodesForWrite());
+        .shouldAvoidStaleDataNodesForWrite());
     DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
         excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
       

+ 2 - 2
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java

@@ -38,6 +38,6 @@ public interface FSClusterStats {
    * @return True if the cluster is currently avoiding using stale DataNodes for
    *         writing targets, and false otherwise.
    */
-  public boolean isAvoidingStaleDataNodesForWrite();
+  public boolean shouldAvoidStaleDataNodesForWrite();
 }
-    
+    

+ 56 - 78
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -365,22 +365,27 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   // precision of access times.
   private long accessTimePrecision = 0;
   private String nameNodeHostName;
-  
-  /** Whether or not to check stale DataNodes for read/write */
-  private boolean checkForStaleDataNodes;
   /** The interval for judging stale DataNodes for read/write */
   private long staleInterval;
-  /** Whether or not to avoid using stale DataNodes for writing */
-  private volatile boolean avoidStaleDataNodesForWrite;
-  private boolean initialAvoidWriteStaleNodes;
-  /** The number of stale DataNodes */
-  private volatile int numStaleNodes = 0;
+  /** Whether or not to avoid using stale DataNodes for reading */
+  private boolean avoidStaleDataNodesForRead;
+
+  /**
+   * Whether or not to avoid using stale DataNodes for writing.
+   * Note that, even if this is configured, the policy may be
+   * temporarily disabled when a high percentage of the nodes
+   * are marked as stale.
+   */
+  private boolean avoidStaleDataNodesForWrite;
+
   /**
    * When the ratio of stale datanodes reaches this number, stop avoiding
    * writing to stale datanodes, i.e., continue using stale nodes for writing.
    */
   private float ratioUseStaleDataNodesForWrite;
-  
+  /** The number of stale DataNodes */
+  private volatile int numStaleNodes = 0;
+
   /**
    * FSNamesystem constructor.
    */
@@ -593,15 +598,23 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         + " min(s)");
     
     // set the value of stale interval based on configuration
-    checkForStaleDataNodes = conf.getBoolean(
-        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
-        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
-    staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
-    avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
-        checkForStaleDataNodes);
-    initialAvoidWriteStaleNodes = avoidStaleDataNodesForWrite;
-    ratioUseStaleDataNodesForWrite = 
+    this.avoidStaleDataNodesForRead = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
+    this.avoidStaleDataNodesForWrite = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+    this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+    this.ratioUseStaleDataNodesForWrite = 
         getRatioUseStaleNodesForWriteFromConf(conf);
+
+    if (avoidStaleDataNodesForWrite && staleInterval < heartbeatRecheckInterval) {
+      this.heartbeatRecheckInterval = staleInterval;
+      LOG.info("Setting heartbeat recheck interval to " + staleInterval
+          + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
+          + " is less than "
+          + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
+    }
   }
   
   private static float getRatioUseStaleNodesForWriteFromConf(Configuration conf) {
@@ -658,22 +671,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     return staleInterval;
   }
 
-  static boolean getAvoidStaleForWriteFromConf(Configuration conf,
-      boolean checkForStale) {
-    boolean avoid = conf.getBoolean(
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
-        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
-    boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
-    if (!checkForStale && avoid) {
-      LOG.warn("Cannot set "
-          + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
-          + " as false while setting "
-          + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
-          + " as true");
-    }
-    return avoidStaleDataNodesForWrite;
-  }
-
   /**
    * Return the default path permission when upgrading from releases with no
    * permissions (<=0.15) to releases with permissions (>=0.16)
@@ -1047,13 +1044,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       }   
 
       DFSUtil.StaleComparator comparator = null;
-      if (checkForStaleDataNodes) {
+      if (avoidStaleDataNodesForRead) {
         comparator = new DFSUtil.StaleComparator(staleInterval);
       }
       // Note: the last block is also included and sorted
       for (LocatedBlock b : blocks.getLocatedBlocks()) {
         clusterMap.pseudoSortByDistance(client, b.getLocations());
-        if (checkForStaleDataNodes) {
+        if (avoidStaleDataNodesForRead) {
           Arrays.sort(b.getLocations(), comparator);
         }
       }
@@ -3753,29 +3750,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
           if (dead == null && isDatanodeDead(nodeInfo)) {
             foundDead = true;
             dead = nodeInfo;
-            if (!this.checkForStaleDataNodes) {
-              break;
-            }
           }
-          if (this.checkForStaleDataNodes
-              && nodeInfo.isStale(this.staleInterval)) {
+          if (nodeInfo.isStale(this.staleInterval)) {
             numOfStaleNodes++;
           }
         }
 
-        // Change whether to avoid using stale datanodes for writing
-        // based on proportion of stale datanodes
-        if (this.checkForStaleDataNodes) {
-          this.numStaleNodes = numOfStaleNodes;
-          if (numOfStaleNodes > heartbeats.size()
-              * this.ratioUseStaleDataNodesForWrite) {
-            this.avoidStaleDataNodesForWrite = false;
-          } else {
-            if (this.initialAvoidWriteStaleNodes) {
-              this.avoidStaleDataNodesForWrite = true;
-            }
-          }
-        }
+        setNumStaleNodes(numOfStaleNodes);
       }
 
       // acquire the fsnamesystem lock, and then remove the dead node.
@@ -6409,41 +6390,38 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DataNodeMap;
   }
-    
-  /**
-   * @return Return the current number of stale DataNodes (detected by
-   *         HeartbeatMonitor).
-   */
-  public int getNumStaleNodes() {
-    return this.numStaleNodes;
-  }
 
   /**
-   * @return whether or not to avoid writing to stale datanodes
+   * Set the number of stale DataNodes, based on DataNodes' heartbeats.
+   *
+   * @param numStaleNodes
+   *          The number of stale DataNodes to be set.
    */
-  @Override // FSClusterStats
-  public boolean isAvoidingStaleDataNodesForWrite() {
-    return avoidStaleDataNodesForWrite;
+  void setNumStaleNodes(int numStaleNodes) {
+    this.numStaleNodes = numStaleNodes;
   }
 
   /**
-   * @return The interval used to judge whether or not a DataNode is stale
+   * @return Return the current number of stale DataNodes (detected by
+   *         HeartbeatMonitor).
    */
-  public long getStaleInterval() {
-    return this.staleInterval;
+  public int getNumStaleNodes() {
+    return this.numStaleNodes;
   }
 
-  /**
-   * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. The
-   * HeartbeatManager disable avoidStaleDataNodesForWrite when more than half of
-   * the DataNodes are marked as stale.
+  /*
+   * Whether stale datanodes should be avoided as targets on the write path.
+   * The result of this function may change if the number of stale datanodes
+   * eclipses a configurable threshold.
    * 
-   * @param avoidStaleDataNodesForWrite
-   *          The value to set to
-   *          {@link DatanodeManager#avoidStaleDataNodesForWrite}
-   */
-  void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
-    this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+   * @return whether stale datanodes should be avoided on the write path
+   */
+  public boolean shouldAvoidStaleDataNodesForWrite() {
+    // If # stale exceeds maximum staleness ratio, disable stale
+    // datanode avoidance on the write path
+    return avoidStaleDataNodesForWrite &&
+        (numStaleNodes <= heartbeats.size()
+            * ratioUseStaleDataNodesForWrite);
   }
   
   /**

+ 1 - 1
src/test/org/apache/hadoop/hdfs/TestGetBlocks.java

@@ -66,7 +66,7 @@ public class TestGetBlocks extends TestCase {
    */
   public void testReadSelectNonStaleDatanode() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
     // DataNode will send out heartbeat every 15 minutes
     // In this way, when we have set a datanode as stale,
     // its heartbeat will not come to refresh its state

+ 20 - 13
src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 
 import java.io.IOException;
@@ -71,18 +72,24 @@ public class TestReplicationPolicy extends TestCase {
     try {
       FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
       CONF.set("dfs.http.address", "0.0.0.0:0");
+      CONF.setBoolean(
+          DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
       NameNode.format(CONF);
       namenode = new NameNode(CONF);
     } catch (IOException e) {
       e.printStackTrace();
       throw (RuntimeException)new RuntimeException().initCause(e);
     }
+    // Override fsNamesystem to always avoid stale datanodes
     FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
     replicator = fsNamesystem.replicator;
     cluster = fsNamesystem.clusterMap;
+    ArrayList<DatanodeDescriptor> heartbeats = fsNamesystem.heartbeats;
     // construct network topology
     for(int i=0; i<NUM_OF_DATANODES; i++) {
+      dataNodes[i].isAlive = true;
       cluster.add(dataNodes[i]);
+      heartbeats.add(dataNodes[i]);
     }
     for(int i=0; i<NUM_OF_DATANODES; i++) {
       dataNodes[i].updateHeartbeat(
@@ -353,10 +360,10 @@ public class TestReplicationPolicy extends TestCase {
   public void testChooseTargetWithMoreThanAvailableNodesWithStaleness()
       throws Exception {
     try {
-      namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
+      namenode.getNamesystem().setNumStaleNodes(NUM_OF_DATANODES);
       testChooseTargetWithMoreThanAvailableNodes();
     } finally {
-      namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
+      namenode.getNamesystem().setNumStaleNodes(0);
     }
   }
   
@@ -424,10 +431,10 @@ public class TestReplicationPolicy extends TestCase {
   }
 
   public void testChooseTargetWithStaleNodes() throws Exception {
-    // Enable avoiding writing to stale DataNodes
-    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
     // Set dataNodes[0] as stale
     dataNodes[0].setLastUpdate(System.currentTimeMillis() - staleInterval - 1);
+    namenode.getNamesystem().heartbeatCheck();
+    assertTrue(namenode.getNamesystem().shouldAvoidStaleDataNodesForWrite());
 
     DatanodeDescriptor[] targets;
     // We set the dataNodes[0] as stale, thus should choose dataNodes[1] since
@@ -445,8 +452,8 @@ public class TestReplicationPolicy extends TestCase {
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
 
     // reset
-    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
     dataNodes[0].setLastUpdate(System.currentTimeMillis());
+    namenode.getNamesystem().heartbeatCheck();
   }
 
   /**
@@ -458,13 +465,12 @@ public class TestReplicationPolicy extends TestCase {
    * @throws Exception
    */
   public void testChooseTargetWithHalfStaleNodes() throws Exception {
-    // Enable stale datanodes checking
-    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
     // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
     for (int i = 0; i < 3; i++) {
       dataNodes[i]
           .setLastUpdate(System.currentTimeMillis() - staleInterval - 1);
     }
+    namenode.getNamesystem().heartbeatCheck();
 
     DatanodeDescriptor[] targets;
     // We set the datanode[0~2] as stale, thus should not choose them
@@ -490,11 +496,11 @@ public class TestReplicationPolicy extends TestCase {
     assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
 
     // reset
-    namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
     for (int i = 0; i < dataNodes.length; i++) {
       dataNodes[i].setLastUpdate(System.currentTimeMillis());
     }
-  }   
+    namenode.getNamesystem().heartbeatCheck();
+  }
   
   /**
    * This testcase tests re-replication, when dataNodes[0] is already chosen.
@@ -601,7 +607,8 @@ public class TestReplicationPolicy extends TestCase {
   
   public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
     conf.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
     // DataNode will send out heartbeat every 15 minutes
@@ -640,7 +647,7 @@ public class TestReplicationPolicy extends TestCase {
           .getNumStaleNodes();
       assertEquals(numStaleNodes, 2);
       assertTrue(miniCluster.getNameNode().getNamesystem()
-          .isAvoidingStaleDataNodesForWrite());
+          .shouldAvoidStaleDataNodesForWrite());
       // Check metrics
       assertGauge("StaleDataNodes", numStaleNodes, miniCluster.getNameNode()
           .getNamesystem());
@@ -670,7 +677,7 @@ public class TestReplicationPolicy extends TestCase {
       // According to our strategy, stale datanodes will be included for writing
       // to avoid hotspots
       assertFalse(miniCluster.getNameNode().getNamesystem()
-          .isAvoidingStaleDataNodesForWrite());
+          .shouldAvoidStaleDataNodesForWrite());
       // Check metrics
       assertGauge("StaleDataNodes", numStaleNodes, miniCluster.getNameNode()
           .getNamesystem());
@@ -693,7 +700,7 @@ public class TestReplicationPolicy extends TestCase {
           .getNumStaleNodes();
       assertEquals(numStaleNodes, 2);
       assertTrue(miniCluster.getNameNode().getNamesystem()
-          .isAvoidingStaleDataNodesForWrite());
+          .shouldAvoidStaleDataNodesForWrite());
       // Check metrics
       assertGauge("StaleDataNodes", numStaleNodes, miniCluster.getNameNode()
           .getNamesystem());