Sfoglia il codice sorgente

HDFS-3368. Missing blocks due to bad DataNodes coming up and down. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1342521 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 13 anni fa
parent
commit
6754548083

+ 2 - 0
hdfs/CHANGES.txt

@@ -33,6 +33,8 @@ Release 0.22.1 - Unreleased
     HDFS-2991. Fix case where OP_ADD would not be logged in append().
     (Todd Lipcon via shv)
 
+    HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv)
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -69,6 +69,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT = 00777;
   public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
   public static final int     DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
+  public static final String  DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
+  public static final int     DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4;
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
   public static final String  DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";

+ 25 - 5
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.commons.logging.*;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -43,6 +43,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   private boolean considerLoad; 
   private NetworkTopology clusterMap;
   private FSClusterStats stats;
+  private long heartbeatInterval;   // interval for DataNode heartbeats
+  /**
+   * A miss of that many heartbeats is tolerated for replica deletion policy.
+   */
+  private int tolerateHeartbeatMultiplier;
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -58,6 +63,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.heartbeatInterval = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
+    this.tolerateHeartbeatMultiplier = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
   }
 
   /** {@inheritDoc} */
@@ -501,24 +512,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                                                  short replicationFactor,
                                                  Collection<DatanodeDescriptor> first, 
                                                  Collection<DatanodeDescriptor> second) {
+    long oldestHeartbeat =
+      now() - heartbeatInterval * tolerateHeartbeatMultiplier;
+    DatanodeDescriptor oldestHeartbeatNode = null;
     long minSpace = Long.MAX_VALUE;
-    DatanodeDescriptor cur = null;
+    DatanodeDescriptor minSpaceNode = null;
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
     Iterator<DatanodeDescriptor> iter =
           first.isEmpty() ? second.iterator() : first.iterator();
 
-    // pick node with least free space
+    // Pick the node with the oldest heartbeat or with the least free space,
+    // if all hearbeats are within the tolerable heartbeat interval
     while (iter.hasNext() ) {
       DatanodeDescriptor node = iter.next();
       long free = node.getRemaining();
+      long lastHeartbeat = node.getLastUpdate();
+      if(lastHeartbeat < oldestHeartbeat) {
+        oldestHeartbeat = lastHeartbeat;
+        oldestHeartbeatNode = node;
+      }
       if (minSpace > free) {
         minSpace = free;
-        cur = node;
+        minSpaceNode = node;
       }
     }
-    return cur;
+    return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
   }
 }
 

+ 75 - 1
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java

@@ -17,19 +17,23 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 import junit.framework.TestCase;
@@ -95,4 +99,74 @@ public class TestOverReplicatedBlocks extends TestCase {
       cluster.shutdown();
     }
   }
+
+  static final long SMALL_BLOCK_SIZE =
+    DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
+  static final long SMALL_FILE_LENGTH = SMALL_BLOCK_SIZE * 4;
+
+  /**
+   * The test verifies that replica for deletion is chosen on a node,
+   * with the oldest heartbeat, when this heartbeat is larger than the
+   * tolerable heartbeat interval.
+   * It creates a file with several blocks and replication 4.
+   * The last DN is configured to send heartbeats rarely.
+   * 
+   * Test waits until the tolerable heartbeat interval expires, and reduces
+   * replication of the file. All replica deletions should be scheduled for the
+   * last node. No replicas will actually be deleted, since last DN doesn't
+   * send heartbeats. 
+   */
+  public void testChooseReplicaToDelete() throws IOException {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      Configuration conf = new HdfsConfiguration();
+      conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK_SIZE);
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      fs = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300);
+      cluster.startDataNodes(conf, 1, true, null, null, null);
+      DataNode lastDN = cluster.getDataNodes().get(3);
+      String lastDNid = lastDN.getDatanodeRegistration().getStorageID();
+
+      final Path fileName = new Path("/foo2");
+      DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
+      DFSTestUtil.waitReplication(fs, fileName, (short)4);
+
+      // Wait for tolerable number of heartbeats plus one
+      DatanodeDescriptor nodeInfo = null;
+      long lastHeartbeat = 0;
+      long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
+        (DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
+      do {
+        synchronized (namesystem.heartbeats) {
+          synchronized (namesystem.datanodeMap) {
+            nodeInfo = namesystem.getDatanode(lastDN.getDatanodeRegistration());
+            lastHeartbeat = nodeInfo.getLastUpdate();
+          }
+        }
+      } while(now() - lastHeartbeat < waitTime);
+      fs.setReplication(fileName, (short)3);
+
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
+
+      // All replicas for deletion should be scheduled on lastDN.
+      // And should not actually be deleted, because lastDN does not heartbeat.
+      namesystem.readLock();
+      Collection<Block> dnBlocks = 
+        namesystem.blockManager.excessReplicateMap.get(lastDNid);
+      assertEquals("Replicas on node " + lastDNid + " should have been deleted",
+          SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
+      namesystem.readUnlock();
+      for(BlockLocation location : locs)
+        assertEquals("Block should still have 4 replicas",
+            4, location.getNames().length);
+    } finally {
+      if(fs != null) fs.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
 }