Browse Source

HADOOP-3050. DataNode sends one and only one block report after it registers with the namenode. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@644947 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 17 years ago
parent
commit
22d9113b5d

+ 3 - 0
CHANGES.txt

@@ -502,6 +502,9 @@ Trunk (unreleased changes)
     and make exception handling more promiscuous to catch this condition.
     (cdouglas)
 
+    HADOOP-3050. DataNode sends one and only one block report after
+    it registers with the namenode. (Hairong Kuang)
+
 Release 0.16.3 - Unreleased
 
   BUG FIXES

+ 20 - 23
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -103,10 +103,11 @@ public class DataNode implements FSConstants, Runnable {
   Daemon dataXceiveServer = null;
   ThreadGroup threadGroup = null;
   long blockReportInterval;
-  long lastBlockReport = 0;
+  //disallow the sending of BR before instructed to do so
+  long lastBlockReport = Long.MAX_VALUE;
   boolean resetBlockReportTime = true;
   long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
-  boolean mustReportBlocks = false;
+  private boolean waitForFirstBlockReportRequest = false;
   long lastHeartbeat = 0;
   long heartBeatInterval;
   private DataStorage storage = null;
@@ -123,6 +124,8 @@ public class DataNode implements FSConstants, Runnable {
   
   private DataBlockScanner blockScanner;
   private Daemon blockScannerThread;
+  
+  private static final Random R = new Random();
 
   /**
    * We need an estimate for block size to check if the disk partition has
@@ -268,13 +271,11 @@ public class DataNode implements FSConstants, Runnable {
     this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
-    long blockReportIntervalBasis =
-      conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
     this.blockReportInterval =
-      blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
+      conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
     this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
                                             BLOCKREPORT_INITIAL_DELAY)* 1000L; 
-    if (this.initialBlockReportDelay >= blockReportIntervalBasis) {
+    if (this.initialBlockReportDelay >= blockReportInterval) {
       this.initialBlockReportDelay = 0;
       LOG.info("dfs.blockreport.initialDelay is greater than " +
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
@@ -417,7 +418,7 @@ public class DataNode implements FSConstants, Runnable {
       rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
     } catch (NoSuchAlgorithmException e) {
       LOG.warn("Could not use SecureRandom");
-      rand = (new Random()).nextInt(Integer.MAX_VALUE);
+      rand = R.nextInt(Integer.MAX_VALUE);
     }
     dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" + 
                       System.currentTimeMillis();
@@ -465,6 +466,7 @@ public class DataNode implements FSConstants, Runnable {
           + dnRegistration.getStorageID() 
           + ". Expecting " + storage.getStorageID());
     }
+    waitForFirstBlockReportRequest = true;
   }
 
   /**
@@ -648,14 +650,12 @@ public class DataNode implements FSConstants, Runnable {
         }
 
         // send block report
-        if (mustReportBlocks || 
-            startTime - lastBlockReport > blockReportInterval) {
+        if (startTime - lastBlockReport > blockReportInterval) {
           //
           // Send latest blockinfo report if timer has expired.
           // Get back a list of local block(s) that are obsolete
           // and can be safely GC'ed.
           //
-          mustReportBlocks = false;
           long brStartTime = now();
           Block[] bReport = data.getBlockReport();
           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
@@ -669,7 +669,7 @@ public class DataNode implements FSConstants, Runnable {
           // time before we start the periodic block reports.
           //
           if (resetBlockReportTime) {
-            lastBlockReport = startTime - new Random().nextInt((int)(blockReportInterval));
+            lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
             resetBlockReportTime = false;
           } else {
             lastBlockReport = startTime;
@@ -747,8 +747,6 @@ public class DataNode implements FSConstants, Runnable {
     case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration - at start or if NN lost contact
       register();
-      // random short delay - helps scatter the BR from all DNs
-      scheduleBlockReport(initialBlockReportDelay);
       break;
     case DatanodeProtocol.DNA_FINALIZE:
       storage.finalizeUpgrade();
@@ -758,15 +756,14 @@ public class DataNode implements FSConstants, Runnable {
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
     case DatanodeProtocol.DNA_BLOCKREPORT:
-      try {
-        if (initialBlockReportDelay != 0) {
-          //sleep for a random time upto the heartbeat interval 
-          //before sending the block report
-          Thread.sleep((long)(new Random().nextDouble() * heartBeatInterval));
-        }
-        mustReportBlocks = true;
-      } catch (InterruptedException ie) {}
-      return false;
+      // only send BR when receive request the 1st time
+      if (waitForFirstBlockReportRequest) {
+        // dropping all following BR requests
+        waitForFirstBlockReportRequest = false;
+        // random short delay - helps scatter the BR from all DNs
+        scheduleBlockReport(initialBlockReportDelay);
+      }
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -2813,7 +2810,7 @@ public class DataNode implements FSConstants, Runnable {
   public void scheduleBlockReport(long delay) {
     if (delay > 0) { // send BR after random delay
       lastBlockReport = System.currentTimeMillis()
-                            - ( blockReportInterval - new Random().nextInt((int)(delay)));
+                            - ( blockReportInterval - R.nextInt((int)(delay)));
     } else { // send at next heartbeat
       lastBlockReport = lastHeartbeat - blockReportInterval;
     }

+ 4 - 3
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1948,7 +1948,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * @param nodeReg
    * @return true or false
    */
-  public synchronized boolean blockReportProcessed(DatanodeRegistration nodeReg)
+  synchronized boolean blockReportProcessed(DatanodeRegistration nodeReg)
   throws IOException {
     return getDatanode(nodeReg).getBlockReportProcessed();
   }
@@ -1956,7 +1956,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   /**
    * Has the datanode been resolved to a switch/rack
    */
-  public synchronized boolean isResolved(DatanodeRegistration dnReg) {
+  synchronized boolean isResolved(DatanodeRegistration dnReg) {
     try {
       return !getDatanode(dnReg).getNetworkLocation()
             .equals(NetworkTopology.UNRESOLVED);
@@ -2062,6 +2062,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       nodeS.updateRegInfo(nodeReg);
       nodeS.setHostName(hostName);
       nodeS.setNetworkLocation(NetworkTopology.UNRESOLVED);
+      nodeS.setBlockReportProcessed(false);
       addToResolutionQueue(nodeS);
         
       // also treat the registration message as a heartbeat
@@ -2711,6 +2712,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       return null; //drop the block report if the dn hasn't been resolved
     }
 
+    node.setBlockReportProcessed(true);
     //
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -2759,7 +2761,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       }
     }
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
-    node.setBlockReportProcessed(true);
     return obsolete.toArray(new Block[obsolete.size()]);
   }