Browse Source

HDFS-7225. Remove stale block invalidation work when DN re-registers with different UUID. (Zhe Zhang and Andrew Wang)

(cherry picked from commit 406c09ad1150c4971c2b7675fcb0263d40517fbf)
(cherry picked from commit 2e15754a92c6589308ccbbb646166353cc2f2456)
(cherry picked from commit 014d07de2e9b39be4b6793f0e09fcf8548570ad5)
Andrew Wang 10 years ago
parent
commit
0898c21e24

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -41,6 +41,9 @@ Release 2.6.1 - UNRELEASED
     HDFS-7263. Snapshot read can reveal future bytes for appended files.
     (Tao Luo via shv)
 
+    HDFS-7225. Remove stale block invalidation work when DN re-registers with
+    different UUID. (Zhe Zhang and Andrew Wang)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1111,6 +1111,18 @@ public class BlockManager {
     }
   }
 
+  /**
+   * Remove all block invalidation tasks under this datanode UUID;
+   * used when a datanode registers with a new UUID and the old one
+   * is wiped.
+   */
+  void removeFromInvalidates(final DatanodeInfo datanode) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
+    invalidateBlocks.remove(datanode);
+  }
+
   /**
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
@@ -3393,7 +3405,14 @@ public class BlockManager {
         return 0;
       }
       try {
-        toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
+        DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn);
+        if (dnDescriptor == null) {
+          LOG.warn("DataNode " + dn + " cannot be found with UUID " +
+              dn.getDatanodeUuid() + ", removing block invalidation work.");
+          invalidateBlocks.remove(dn);
+          return 0;
+        }
+        toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor);
         
         if (toInvalidate == null) {
           return 0;

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -593,6 +593,8 @@ public class DatanodeManager {
     synchronized (datanodeMap) {
       host2DatanodeMap.remove(datanodeMap.remove(key));
     }
+    // Also remove all block invalidation tasks under this node
+    blockManager.removeFromInvalidates(new DatanodeInfo(node));
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
           + node + "): storage " + key 

+ 131 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java

@@ -17,66 +17,161 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.UUID;
+
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 /**
  * Test if FSNamesystem handles heartbeat right
  */
 public class TestComputeInvalidateWork {
+
+  private Configuration conf;
+  private final int NUM_OF_DATANODES = 3;
+  private MiniDFSCluster cluster;
+  private FSNamesystem namesystem;
+  private BlockManager bm;
+  private DatanodeDescriptor[] nodes;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
+        .build();
+    cluster.waitActive();
+    namesystem = cluster.getNamesystem();
+    bm = namesystem.getBlockManager();
+    nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
+    assertEquals(nodes.length, NUM_OF_DATANODES);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
   /**
-   * Test if {@link FSNamesystem#computeInvalidateWork(int)}
+   * Test if {@link BlockManager#computeInvalidateWork(int)}
    * can schedule invalidate work correctly 
    */
-  @Test
+  @Test(timeout=120000)
   public void testCompInvalidate() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    final int NUM_OF_DATANODES = 3;
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
+    final int blockInvalidateLimit = bm.getDatanodeManager()
+        .blockInvalidateLimit;
+    namesystem.writeLock();
     try {
-      cluster.waitActive();
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      final BlockManager bm = namesystem.getBlockManager();
-      final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
-      final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
-          ).getHeartbeatManager().getDatanodes();
-      assertEquals(nodes.length, NUM_OF_DATANODES);
+      for (int i=0; i<nodes.length; i++) {
+        for(int j=0; j<3*blockInvalidateLimit+1; j++) {
+          Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
+              GenerationStamp.LAST_RESERVED_STAMP);
+          bm.addToInvalidates(block, nodes[i]);
+        }
+      }
       
+      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+          bm.computeInvalidateWork(NUM_OF_DATANODES+1));
+      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+          bm.computeInvalidateWork(NUM_OF_DATANODES));
+      assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
+          bm.computeInvalidateWork(NUM_OF_DATANODES-1));
+      int workCount = bm.computeInvalidateWork(1);
+      if (workCount == 1) {
+        assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
+      } else {
+        assertEquals(workCount, blockInvalidateLimit);
+        assertEquals(2, bm.computeInvalidateWork(2));
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Reformatted DataNodes will replace the original UUID in the
+   * {@link DatanodeManager#datanodeMap}. This tests if block
+   * invalidation work on the original DataNode can be skipped.
+   */
+  @Test(timeout=120000)
+  public void testDatanodeReformat() throws Exception {
+    namesystem.writeLock();
+    try {
+      Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
+      bm.addToInvalidates(block, nodes[0]);
+      // Change the datanode UUID to emulate a reformation
+      nodes[0].setDatanodeUuidForTesting("fortesting");
+      // Since UUID has changed, the invalidation work should be skipped
+      assertEquals(0, bm.computeInvalidateWork(1));
+      assertEquals(0, bm.getPendingDeletionBlocksCount());
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  @Test(timeout=12000)
+  public void testDatanodeReRegistration() throws Exception {
+    // Create a test file
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final Path path = new Path("/testRR");
+    // Create a file and shutdown the DNs, which populates InvalidateBlocks
+    DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
+        (short) NUM_OF_DATANODES, 0xED0ED0);
+    for (DataNode dn : cluster.getDataNodes()) {
+      dn.shutdown();
+    }
+    dfs.delete(path, false);
+    namesystem.writeLock();
+    InvalidateBlocks invalidateBlocks;
+    int expected = NUM_OF_DATANODES;
+    try {
+      invalidateBlocks = (InvalidateBlocks) Whitebox
+          .getInternalState(cluster.getNamesystem().getBlockManager(),
+              "invalidateBlocks");
+      assertEquals("Expected invalidate blocks to be the number of DNs",
+          (long) expected, invalidateBlocks.numBlocks());
+    } finally {
+      namesystem.writeUnlock();
+    }
+    // Re-register each DN and see that it wipes the invalidation work
+    for (DataNode dn : cluster.getDataNodes()) {
+      DatanodeID did = dn.getDatanodeId();
+      did.setDatanodeUuidForTesting(UUID.randomUUID().toString());
+      DatanodeRegistration reg = new DatanodeRegistration(did,
+          new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
+          new ExportedBlockKeys(),
+          VersionInfo.getVersion());
       namesystem.writeLock();
       try {
-        for (int i=0; i<nodes.length; i++) {
-          for(int j=0; j<3*blockInvalidateLimit+1; j++) {
-            Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, 
-                GenerationStamp.LAST_RESERVED_STAMP);
-            bm.addToInvalidates(block, nodes[i]);
-          }
-        }
-        
-        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
-            bm.computeInvalidateWork(NUM_OF_DATANODES+1));
-        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
-            bm.computeInvalidateWork(NUM_OF_DATANODES));
-        assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), 
-            bm.computeInvalidateWork(NUM_OF_DATANODES-1));
-        int workCount = bm.computeInvalidateWork(1);
-        if (workCount == 1) {
-          assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
-        } else {
-          assertEquals(workCount, blockInvalidateLimit);
-          assertEquals(2, bm.computeInvalidateWork(2));
-        }
+        bm.getDatanodeManager().registerDatanode(reg);
+        expected--;
+        assertEquals("Expected number of invalidate blocks to decrease",
+            (long) expected, invalidateBlocks.numBlocks());
       } finally {
-        namesystem.writeUnlock();
+          namesystem.writeUnlock();
       }
-    } finally {
-      cluster.shutdown();
     }
   }
 }