Преглед изворни кода

HADOOP-3002. HDFS should not remove blocks while in safemode. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@674645 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko пре 17 година
родитељ
комит
c107400da0

+ 2 - 0
CHANGES.txt

@@ -831,6 +831,8 @@ Release 0.17.1 - Unreleased
     HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
     the number of xceiver threads in a data-node. (shv)
 
+    HADOOP-3002. HDFS should not remove blocks while in safemode. (shv)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 5 - 3
src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -350,7 +350,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   void reportDiff(BlocksMap blocksMap,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,
-                  Collection<Block> toRemove) {
+                  Collection<Block> toRemove,
+                  Collection<Block> toInvalidate) {
     // place a deilimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -366,8 +367,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
       iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
                newReport.getBlockGenStamp(i));
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) { // Brand new block
-        toAdd.add(new Block(iblk));
+      if(storedBlock == null) {
+        // If block is not in blocksMap it does not belong to any file
+        toInvalidate.add(new Block(iblk));
         continue;
       }
       if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN

+ 43 - 80
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2072,37 +2072,36 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
-        //check lease recovery
-        if (cmd == null) {
-          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        }
-        //check pending replication
-        if (cmd == null) {
-          cmd = nodeinfo.getReplicationCommand(
-              maxReplicationStreams - xmitsInProgress);
+        // If the datanode has (just) been resolved and we haven't ever processed 
+        // a block report from it yet, ask for one now.
+        if (!blockReportProcessed(nodeReg)) {
+          // If we never processed a block report from this datanode, we shouldn't
+          // have any work for that as well
+          assert(cmd == null);
+          if (isResolved(nodeReg)) {
+            return DatanodeCommand.BLOCKREPORT;
+          }
         }
-        //check block invalidation
-        if (cmd == null) {
-          cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+
+        if (isInSafeMode()) {
+          //check distributed upgrade
+          return getDistributedUpgradeCommand();
         }
-      }
-    }
 
-    // If the datanode has (just) been resolved and we haven't ever processed 
-    // a block report from it yet, ask for one now.
-    if (!blockReportProcessed(nodeReg)) {
-      // If we never processed a block report from this datanode, we shouldn't
-      // have any work for that as well
-      assert(cmd == null);
-      if (isResolved(nodeReg)) {
-        return DatanodeCommand.BLOCKREPORT;
+        // All other commands are not allowed in safe mode. 
+        //check lease recovery
+        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (cmd != null)
+          return cmd;
+        //check pending replication
+        cmd = nodeinfo.getReplicationCommand(
+            maxReplicationStreams - xmitsInProgress);
+        if (cmd != null)
+          return cmd;
+        //check block invalidation
+        return nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
       }
     }
-    //check distributed upgrade
-    if (cmd == null) {
-      cmd = getDistributedUpgradeCommand();
-    }
-    return cmd;
   }
 
   private void updateStats(DatanodeDescriptor node, boolean isAdded) {
@@ -2185,6 +2184,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     int workFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
+    // blocks should not be replicated or removed if safe mode is on
+    if (isInSafeMode())
+      return workFound;
     synchronized(heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
@@ -2227,9 +2229,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   private synchronized int computeReplicationWork(
                                   int blocksToProcess) throws IOException {
     int scheduledReplicationCount = 0;
-    // blocks should not be replicated or removed if safe mode is on
-    if (isInSafeMode())
-      return scheduledReplicationCount;
 
     synchronized(neededReplications) {
       // # of blocks to process equals either twice the number of live 
@@ -2605,9 +2604,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public synchronized Block[] processReport(DatanodeID nodeID, 
-                                            BlockListAsLongs newReport
-                                            ) throws IOException {
+  public synchronized void processReport(DatanodeID nodeID, 
+                                         BlockListAsLongs newReport
+                                        ) throws IOException {
     long startTime = now();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2629,7 +2628,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
       LOG.info("Ignoring block report from " + nodeID.getName() + 
           " because rack location for this datanode is still to be resolved."); 
-      return null; //drop the block report if the dn hasn't been resolved
+      return; //drop the block report if the dn hasn't been resolved
     }
 
     node.setBlockReportProcessed(true);
@@ -2639,7 +2638,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     //
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
-    node.reportDiff(blocksMap, newReport, toAdd, toRemove);
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
         
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -2647,41 +2647,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     for (Block b : toAdd) {
       addStoredBlock(b, node, null);
     }
-        
-    //
-    // We've now completely updated the node's block report profile.
-    // We now go through all its blocks and find which ones are invalid,
-    // no longer pending, or over-replicated.
-    //
-    // (Note it's not enough to just invalidate blocks at lease expiry 
-    // time; datanodes can go down before the client's lease on 
-    // the failed file expires and miss the "expire" event.)
-    //
-    // This function considers every block on a datanode, and thus
-    // should only be invoked infrequently.
-    //
-    Collection<Block> obsolete = new ArrayList<Block>();
-    for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
-      Block b = it.next();
-
-      // 
-      // A block report can only send BLOCK_INVALIDATE_CHUNK number of
-      // blocks to be deleted. If there are more blocks to be deleted, 
-      // they are added to recentInvalidateSets and will be sent out
-      // thorugh succeeding heartbeat responses.
-      //
-      if (!isValidBlock(b)) {
-        if (obsolete.size() > blockInvalidateLimit) {
-          addToInvalidates(b, node);
-        } else {
-          obsolete.add(b);
-        }
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                                      +"ask "+nodeID.getName()+" to delete "+b);
-      }
+    for (Block b : toInvalidate) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block " 
+          + b + " on " + node.getName() + " size " + b.getNumBytes()
+          + " does not belong to any file.");
+      addToInvalidates(b, node);
     }
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
-    return obsolete.toArray(new Block[obsolete.size()]);
   }
 
   /**
@@ -2693,7 +2665,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                     DatanodeDescriptor node,
                                     DatanodeDescriptor delNodeHint) {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
-    INodeFile fileINode = null;
     boolean added = false;
     if(storedBlock == null) { // block is not in the blocksMaps
       // add block to the blocksMap and to the data-node
@@ -2705,7 +2676,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     }
     assert storedBlock != null : "Block must be stored by now";
 
-    fileINode = storedBlock.getINode();
     if (block != storedBlock) {
       if (block.getNumBytes() > 0) {
         long cursize = storedBlock.getNumBytes();
@@ -2780,17 +2750,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                    + block + " on " + node.getName()
                                    + " size " + block.getNumBytes());
     }
-    //
-    // If this block does not belong to anyfile, then we are done.
-    //
-    if (fileINode == null) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                   + "addStoredBlock request received for " 
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes()
-                                   + " But it does not belong to any file.");
-      return block;
-    }
+
+    assert isValidBlock(storedBlock) : "Trying to add an invalid block";
 
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(storedBlock);
@@ -2805,6 +2766,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     // if file is being actively written to, then do not check 
     // replication-factor here. It will be checked when the file is closed.
     //
+    INodeFile fileINode = null;
+    fileINode = storedBlock.getINode();
     if (fileINode.isUnderConstruction()) {
       return block;
     }

+ 1 - 3
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -620,9 +620,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
            +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks");
 
-    Block blocksToDelete[] = namesystem.processReport(nodeReg, blist);
-    if (blocksToDelete != null && blocksToDelete.length > 0)
-      return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocksToDelete);
+    namesystem.processReport(nodeReg, blist);
     if (getFSImage().isUpgradeFinalized())
       return DatanodeCommand.FINALIZE;
     return null;