Bläddra i källkod

svn merge -c 1342512 from trunk for HDFS-3368. Missing blocks due to bad DataNodes coming up and down.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1342749 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 år sedan
förälder
incheckning
4303c38361

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -116,6 +116,8 @@ Release 2.0.1-alpha - UNRELEASED
     HDFS-3058. HA: Bring BookKeeperJournalManager up to date with HA changes.
     (Ivan Kelly via umamahesh)
 
+    HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv)
+
 Release 2.0.0-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -107,6 +107,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
   public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
   public static final int     DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
+  public static final String  DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
+  public static final int     DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4;
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
   public static final String  DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";

+ 31 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -25,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -49,13 +52,19 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+  private static final String enableDebugLogging =
+    "For more information, please enable DEBUG log level on "
+    + ((Log4JLogger)LOG).getLogger().getName();
+
   private boolean considerLoad; 
   private boolean preferLocalNode = true;
   private NetworkTopology clusterMap;
   private FSClusterStats stats;
-  static final String enableDebugLogging = "For more information, please enable"
-    + " DEBUG level logging on the "
-    + "org.apache.hadoop.hdfs.server.namenode.FSNamesystem logger.";
+  private long heartbeatInterval;   // interval for DataNode heartbeats
+  /**
+   * A miss of that many heartbeats is tolerated for replica deletion policy.
+   */
+  private int tolerateHeartbeatMultiplier;
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -71,6 +80,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.heartbeatInterval = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
+    this.tolerateHeartbeatMultiplier = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
   }
 
   private ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -551,24 +566,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                                                  short replicationFactor,
                                                  Collection<DatanodeDescriptor> first, 
                                                  Collection<DatanodeDescriptor> second) {
+    long oldestHeartbeat =
+      now() - heartbeatInterval * tolerateHeartbeatMultiplier;
+    DatanodeDescriptor oldestHeartbeatNode = null;
     long minSpace = Long.MAX_VALUE;
-    DatanodeDescriptor cur = null;
+    DatanodeDescriptor minSpaceNode = null;
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
     Iterator<DatanodeDescriptor> iter =
           first.isEmpty() ? second.iterator() : first.iterator();
 
-    // pick node with least free space
+    // Pick the node with the oldest heartbeat or with the least free space,
+    // if all hearbeats are within the tolerable heartbeat interval
     while (iter.hasNext() ) {
       DatanodeDescriptor node = iter.next();
       long free = node.getRemaining();
+      long lastHeartbeat = node.getLastUpdate();
+      if(lastHeartbeat < oldestHeartbeat) {
+        oldestHeartbeat = lastHeartbeat;
+        oldestHeartbeatNode = node;
+      }
       if (minSpace > free) {
         minSpace = free;
-        cur = node;
+        minSpaceNode = node;
       }
     }
-    return cur;
+    return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
   }
   
   @VisibleForTesting

+ 77 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java

@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
 import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,11 +35,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.junit.Test;
 
 public class TestOverReplicatedBlocks {
@@ -116,6 +122,77 @@ public class TestOverReplicatedBlocks {
       cluster.shutdown();
     }
   }
+
+  static final long SMALL_BLOCK_SIZE =
+    DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+  static final long SMALL_FILE_LENGTH = SMALL_BLOCK_SIZE * 4;
+
+  /**
+   * The test verifies that replica for deletion is chosen on a node,
+   * with the oldest heartbeat, when this heartbeat is larger than the
+   * tolerable heartbeat interval.
+   * It creates a file with several blocks and replication 4.
+   * The last DN is configured to send heartbeats rarely.
+   * 
+   * Test waits until the tolerable heartbeat interval expires, and reduces
+   * replication of the file. All replica deletions should be scheduled for the
+   * last node. No replicas will actually be deleted, since last DN doesn't
+   * send heartbeats. 
+   */
+  @Test
+  public void testChooseReplicaToDelete() throws IOException {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      Configuration conf = new HdfsConfiguration();
+      conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK_SIZE);
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      fs = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300);
+      cluster.startDataNodes(conf, 1, true, null, null, null);
+      DataNode lastDN = cluster.getDataNodes().get(3);
+      DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
+          lastDN, namesystem.getBlockPoolId());
+      String lastDNid = dnReg.getStorageID();
+
+      final Path fileName = new Path("/foo2");
+      DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
+      DFSTestUtil.waitReplication(fs, fileName, (short)4);
+
+      // Wait for tolerable number of heartbeats plus one
+      DatanodeDescriptor nodeInfo = null;
+      long lastHeartbeat = 0;
+      long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
+        (DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
+      do {
+        nodeInfo = 
+          namesystem.getBlockManager().getDatanodeManager().getDatanode(dnReg);
+        lastHeartbeat = nodeInfo.getLastUpdate();
+      } while(now() - lastHeartbeat < waitTime);
+      fs.setReplication(fileName, (short)3);
+
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
+
+      // All replicas for deletion should be scheduled on lastDN.
+      // And should not actually be deleted, because lastDN does not heartbeat.
+      namesystem.readLock();
+      Collection<Block> dnBlocks = 
+        namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
+      assertEquals("Replicas on node " + lastDNid + " should have been deleted",
+          SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
+      namesystem.readUnlock();
+      for(BlockLocation location : locs)
+        assertEquals("Block should still have 4 replicas",
+            4, location.getNames().length);
+    } finally {
+      if(fs != null) fs.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+
   /**
    * Test over replicated block should get invalidated when decreasing the
    * replication for a partial block.