1
0
Kaynağa Gözat

Revert changes for revision 674645 related to HADOOP-3002.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@674671 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 17 yıl önce
ebeveyn
işleme
4d2c2a3f86

+ 0 - 2
CHANGES.txt

@@ -831,8 +831,6 @@ Release 0.17.1 - Unreleased
     HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
     the number of xceiver threads in a data-node. (shv)
 
-    HADOOP-3002. HDFS should not remove blocks while in safemode. (shv)
-
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 3 - 5
src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -350,8 +350,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   void reportDiff(BlocksMap blocksMap,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,
-                  Collection<Block> toRemove,
-                  Collection<Block> toInvalidate) {
+                  Collection<Block> toRemove) {
     // place a deilimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -367,9 +366,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
       iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
                newReport.getBlockGenStamp(i));
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) {
-        // If block is not in blocksMap it does not belong to any file
-        toInvalidate.add(new Block(iblk));
+      if(storedBlock == null) { // Brand new block
+        toAdd.add(new Block(iblk));
         continue;
       }
       if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN

+ 80 - 43
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2072,36 +2072,37 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
         updateStats(nodeinfo, true);
         
-        // If the datanode has (just) been resolved and we haven't ever processed 
-        // a block report from it yet, ask for one now.
-        if (!blockReportProcessed(nodeReg)) {
-          // If we never processed a block report from this datanode, we shouldn't
-          // have any work for that as well
-          assert(cmd == null);
-          if (isResolved(nodeReg)) {
-            return DatanodeCommand.BLOCKREPORT;
-          }
-        }
-
-        if (isInSafeMode()) {
-          //check distributed upgrade
-          return getDistributedUpgradeCommand();
-        }
-
-        // All other commands are not allowed in safe mode. 
         //check lease recovery
-        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (cmd != null)
-          return cmd;
+        if (cmd == null) {
+          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        }
         //check pending replication
-        cmd = nodeinfo.getReplicationCommand(
-            maxReplicationStreams - xmitsInProgress);
-        if (cmd != null)
-          return cmd;
+        if (cmd == null) {
+          cmd = nodeinfo.getReplicationCommand(
+              maxReplicationStreams - xmitsInProgress);
+        }
         //check block invalidation
-        return nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        if (cmd == null) {
+          cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        }
+      }
+    }
+
+    // If the datanode has (just) been resolved and we haven't ever processed 
+    // a block report from it yet, ask for one now.
+    if (!blockReportProcessed(nodeReg)) {
+      // If we never processed a block report from this datanode, we shouldn't
+      // have any work for that as well
+      assert(cmd == null);
+      if (isResolved(nodeReg)) {
+        return DatanodeCommand.BLOCKREPORT;
       }
     }
+    //check distributed upgrade
+    if (cmd == null) {
+      cmd = getDistributedUpgradeCommand();
+    }
+    return cmd;
   }
 
   private void updateStats(DatanodeDescriptor node, boolean isAdded) {
@@ -2184,9 +2185,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     int workFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
-    // blocks should not be replicated or removed if safe mode is on
-    if (isInSafeMode())
-      return workFound;
     synchronized(heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
@@ -2229,6 +2227,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   private synchronized int computeReplicationWork(
                                   int blocksToProcess) throws IOException {
     int scheduledReplicationCount = 0;
+    // blocks should not be replicated or removed if safe mode is on
+    if (isInSafeMode())
+      return scheduledReplicationCount;
 
     synchronized(neededReplications) {
       // # of blocks to process equals either twice the number of live 
@@ -2604,9 +2605,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * The given node is reporting all its blocks.  Use this info to 
    * update the (machine-->blocklist) and (block-->machinelist) tables.
    */
-  public synchronized void processReport(DatanodeID nodeID, 
-                                         BlockListAsLongs newReport
-                                        ) throws IOException {
+  public synchronized Block[] processReport(DatanodeID nodeID, 
+                                            BlockListAsLongs newReport
+                                            ) throws IOException {
     long startTime = now();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2628,7 +2629,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
       LOG.info("Ignoring block report from " + nodeID.getName() + 
           " because rack location for this datanode is still to be resolved."); 
-      return; //drop the block report if the dn hasn't been resolved
+      return null; //drop the block report if the dn hasn't been resolved
     }
 
     node.setBlockReportProcessed(true);
@@ -2638,8 +2639,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     //
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
+    node.reportDiff(blocksMap, newReport, toAdd, toRemove);
         
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -2647,13 +2647,41 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     for (Block b : toAdd) {
       addStoredBlock(b, node, null);
     }
-    for (Block b : toInvalidate) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block " 
-          + b + " on " + node.getName() + " size " + b.getNumBytes()
-          + " does not belong to any file.");
-      addToInvalidates(b, node);
+        
+    //
+    // We've now completely updated the node's block report profile.
+    // We now go through all its blocks and find which ones are invalid,
+    // no longer pending, or over-replicated.
+    //
+    // (Note it's not enough to just invalidate blocks at lease expiry 
+    // time; datanodes can go down before the client's lease on 
+    // the failed file expires and miss the "expire" event.)
+    //
+    // This function considers every block on a datanode, and thus
+    // should only be invoked infrequently.
+    //
+    Collection<Block> obsolete = new ArrayList<Block>();
+    for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
+      Block b = it.next();
+
+      // 
+      // A block report can only send BLOCK_INVALIDATE_CHUNK number of
+      // blocks to be deleted. If there are more blocks to be deleted, 
+      // they are added to recentInvalidateSets and will be sent out
+      // thorugh succeeding heartbeat responses.
+      //
+      if (!isValidBlock(b)) {
+        if (obsolete.size() > blockInvalidateLimit) {
+          addToInvalidates(b, node);
+        } else {
+          obsolete.add(b);
+        }
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
+                                      +"ask "+nodeID.getName()+" to delete "+b);
+      }
     }
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+    return obsolete.toArray(new Block[obsolete.size()]);
   }
 
   /**
@@ -2665,6 +2693,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                     DatanodeDescriptor node,
                                     DatanodeDescriptor delNodeHint) {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    INodeFile fileINode = null;
     boolean added = false;
     if(storedBlock == null) { // block is not in the blocksMaps
       // add block to the blocksMap and to the data-node
@@ -2676,6 +2705,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     }
     assert storedBlock != null : "Block must be stored by now";
 
+    fileINode = storedBlock.getINode();
     if (block != storedBlock) {
       if (block.getNumBytes() > 0) {
         long cursize = storedBlock.getNumBytes();
@@ -2750,8 +2780,17 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                    + block + " on " + node.getName()
                                    + " size " + block.getNumBytes());
     }
-
-    assert isValidBlock(storedBlock) : "Trying to add an invalid block";
+    //
+    // If this block does not belong to anyfile, then we are done.
+    //
+    if (fileINode == null) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+                                   + "addStoredBlock request received for " 
+                                   + block + " on " + node.getName()
+                                   + " size " + block.getNumBytes()
+                                   + " But it does not belong to any file.");
+      return block;
+    }
 
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(storedBlock);
@@ -2766,8 +2805,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     // if file is being actively written to, then do not check 
     // replication-factor here. It will be checked when the file is closed.
     //
-    INodeFile fileINode = null;
-    fileINode = storedBlock.getINode();
     if (fileINode.isUnderConstruction()) {
       return block;
     }

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -620,7 +620,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
            +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks");
 
-    namesystem.processReport(nodeReg, blist);
+    Block blocksToDelete[] = namesystem.processReport(nodeReg, blist);
+    if (blocksToDelete != null && blocksToDelete.length > 0)
+      return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocksToDelete);
     if (getFSImage().isUpgradeFinalized())
       return DatanodeCommand.FINALIZE;
     return null;