Explorar o código

HDFS-4288. NN accepts incremental BR as IBR in safemode. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1440201 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee %!s(int64=12) %!d(string=hai) anos
pai
achega
5a8832313f

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -11,6 +11,7 @@ Release 0.23.7 - UNRELEASED
   OPTIMIZATIONS
 
   BUG FIXES
+    HDFS-4288. NN accepts incremental BR as IBR in safemode (daryn via kihwal)
 
 Release 0.23.6 - UNRELEASED
 

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java

@@ -77,11 +77,7 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
   DatanodeDescriptor getDatanode(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    DatanodeDescriptor node = (DatanodeDescriptor)triplets[index*3];
-    assert node == null || 
-        DatanodeDescriptor.class.getName().equals(node.getClass().getName()) : 
-              "DatanodeDescriptor is expected at " + index*3;
-    return node;
+    return (DatanodeDescriptor)triplets[index*3];
   }
 
   BlockInfo getPrevious(int index) {

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1394,7 +1395,7 @@ public class BlockManager {
 
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
-      if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
+      if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
         blockLog.info("BLOCK* processReport: "
             + "discarded non-initial block report from " + nodeID.getName()
             + " because namenode still in startup phase");
@@ -1408,13 +1409,17 @@ public class BlockManager {
       } else {
         processReport(node, newReport);
       }
+      node.receivedBlockReport();
     } finally {
       endTime = Util.now();
       namesystem.writeUnlock();
     }
 
     // Log the block report processing stats from Namenode perspective
-    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
+    final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+    if (metrics != null) {
+      metrics.addBlockReport((int) (endTime - startTime));
+    }
     blockLog.info("BLOCK* processReport: from "
         + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
         + ", processing time: " + (endTime - startTime) + " msecs");

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -132,6 +132,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
+  
+  /* Set to true after processing first block report.  Will be reset to false
+   * if the node re-registers.  This enables a NN in safe-mode to reprocess
+   * the first block report in case the DN is now reporting different blocks
+   */
+  private boolean processedBlockReport = false;
+  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -575,6 +582,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * @param nodeReg DatanodeID to update registration for.
    */
   public void updateRegInfo(DatanodeID nodeReg) {
+    processedBlockReport = false; // must re-process IBR after re-registration
     super.updateRegInfo(nodeReg);
   }
 
@@ -592,5 +600,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.bandwidth = bandwidth;
   }
 
+  public void receivedBlockReport() {
+    processedBlockReport = true;
+  }
 
+  boolean isFirstBlockReport() {
+    return !processedBlockReport;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -318,7 +318,7 @@ public class DatanodeManager {
   }
 
   /** Add a datanode. */
-  private void addDatanode(final DatanodeDescriptor node) {
+  void addDatanode(final DatanodeDescriptor node) {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -59,6 +59,10 @@ implements Writable, NodeRegistration {
     this("");
   }
   
+  public DatanodeRegistration(DatanodeID node) {
+    super(node);
+  }
+  
   /**
    * Create DatanodeRegistration
    */

+ 68 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -36,10 +37,12 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.net.NetworkTopology;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import static org.mockito.Mockito.*;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -442,4 +445,69 @@ public class TestBlockManager {
             new NumberReplicas(),
             UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
   }
+
+  @Test
+  public void testSafeModeIBR() throws Exception {
+    DatanodeDescriptor node = spy(nodes.get(0));
+    node.setStorageID("dummy-storage");
+    node.isAlive = true;
+
+    DatanodeRegistration nodeReg = new DatanodeRegistration(node);
+
+    // pretend to be in safemode
+    doReturn(true).when(fsn).isInStartupSafeMode();
+    
+    // register new node
+    bm.getDatanodeManager().registerDatanode(nodeReg);
+    bm.getDatanodeManager().addDatanode(node); // swap in spy    
+    assertEquals(node, bm.getDatanodeManager().getDatanode(node));
+    assertTrue(node.isFirstBlockReport());
+    // send block report, should be processed
+    reset(node);
+    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    verify(node).receivedBlockReport();
+    assertFalse(node.isFirstBlockReport());
+    // send block report again, should NOT be processed
+    reset(node);
+    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    verify(node, never()).receivedBlockReport();
+    assertFalse(node.isFirstBlockReport());
+
+    // re-register as if node restarted, should update existing node
+    bm.getDatanodeManager().removeDatanode(node);
+    reset(node);
+    bm.getDatanodeManager().registerDatanode(nodeReg);
+    verify(node).updateRegInfo(nodeReg);
+    assertTrue(node.isFirstBlockReport()); // ready for report again
+    // send block report, should be processed after restart
+    reset(node);
+    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    verify(node).receivedBlockReport();
+    assertFalse(node.isFirstBlockReport());
+  }
+  
+  @Test
+  public void testSafeModeIBRAfterIncremental() throws Exception {
+    DatanodeDescriptor node = spy(nodes.get(0));
+    node.setStorageID("dummy-storage");
+    node.isAlive = true;
+
+    DatanodeRegistration nodeReg = new DatanodeRegistration(node);
+    BlockListAsLongs blockReport = new BlockListAsLongs(null, null);
+
+    // pretend to be in safemode
+    doReturn(true).when(fsn).isInStartupSafeMode();
+
+    // register new node
+    bm.getDatanodeManager().registerDatanode(nodeReg);
+    bm.getDatanodeManager().addDatanode(node); // swap in spy    
+    assertEquals(node, bm.getDatanodeManager().getDatanode(node));
+    assertTrue(node.isFirstBlockReport());
+    // send block report while pretending to already have blocks
+    reset(node);
+    doReturn(1).when(node).numBlocks();
+    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
+    verify(node).receivedBlockReport();
+    assertFalse(node.isFirstBlockReport());
+  }
 }