Explorar el Código

HADOOP-3176. Restructure internal namenode methods that process
heartbeats to use well-defined BlockCommand object(s) instead of
using the base java Object. (Tsz Wo (Nicholas), SZE via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@648380 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur hace 17 años
padre
commit
0f7c3f83ff

+ 4 - 0
CHANGES.txt

@@ -27,6 +27,10 @@ Trunk (unreleased changes)
 
     Increment ClientProtocol.versionID missed by HADOOP-2585. (shv)
 
+    HADOOP-3176. Restructure internal namenode methods that process
+    heartbeats to use well-defined BlockCommand object(s) instead of 
+    using the base java Object. (Tsz Wo (Nicholas), SZE via dhruba)
+
 Release 0.17.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 4 - 4
src/java/org/apache/hadoop/dfs/BlockCommand.java

@@ -81,11 +81,11 @@ class BlockCommand extends DatanodeCommand {
   }
 
   /**
-   * Create BlockCommand for block invalidation
-   * @param blocks  blocks to invalidate
+   * Create BlockCommand for the given action
+   * @param blocks blocks related to the action
    */
-  public BlockCommand(Block blocks[]) {
-    super(DatanodeProtocol.DNA_INVALIDATE);
+  public BlockCommand(int action, Block blocks[]) {
+    super(action);
     this.blocks = blocks;
     this.targets = new DatanodeInfo[0][];
   }

+ 21 - 21
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -48,6 +48,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   List<DatanodeDescriptor[]> replicateTargetSets;
   Set<Block> invalidateBlocks;
   boolean processedBlockReport = false;
+
   
   /** Default constructor */
   public DatanodeDescriptor() {
@@ -274,15 +275,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * Remove the specified number of target sets
    */
-  void getReplicationSets(int maxNumTransfers, Object[] xferResults) {
-    assert(xferResults.length == 2);
-    assert(xferResults[0] == null && xferResults[1] == null);
-
+  BlockCommand getReplicationCommand(int maxNumTransfers) {
     synchronized (replicateBlocks) {
       assert(replicateBlocks.size() == replicateTargetSets.size());
 
       if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
-        return;
+        return null;
       }
       int numTransfers = 0;
       int numBlocks = 0;
@@ -301,32 +299,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
         replicateBlocks.remove(0);
         replicateTargetSets.remove(0);
       }
-      xferResults[0] = blocklist;
-      xferResults[1] = targets;
       assert(blocklist.length > 0 && targets.length > 0);
+      return new BlockCommand(blocklist, targets);
     }
   }
 
   /**
    * Remove the specified number of blocks to be invalidated
    */
-  void getInvalidateBlocks(int maxblocks, Object[] xferResults) {
-    assert(xferResults[0] == null);
+  BlockCommand getInvalidateBlocks(int maxblocks) {
+    Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks); 
+    return deleteList == null? 
+        null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList);
+  }
 
-    synchronized (invalidateBlocks) {
-      if (maxblocks <= 0 || invalidateBlocks.size() == 0) {
-        return;
-      }
-      int outnum = Math.min(maxblocks, invalidateBlocks.size());
-      Block[] blocklist = new Block[outnum];
-      Iterator<Block> iter = invalidateBlocks.iterator();
-      for (int i = 0; i < outnum; i++) {
-        blocklist[i] = iter.next();
-        iter.remove();
+  static private Block[] getBlockArray(Collection<Block> blocks, int max) {
+    Block[] blockarray = null;
+    synchronized(blocks) {
+      int n = blocks.size();
+      if (max > 0 && n > 0) {
+        if (max < n) {
+          n = max;
+        }
+        blockarray = blocks.toArray(new Block[n]);
+        blocks.clear();
+        assert(blockarray.length > 0);
       }
-      assert(blocklist.length > 0);
-      xferResults[0] = blocklist;
     }
+    return blockarray;
   }
 
   void reportDiff(BlocksMap blocksMap,

+ 41 - 33
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -2136,55 +2136,63 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * If a substantial amount of time passed since the last datanode 
    * heartbeat then request an immediate block report.  
    * 
-   * @return true if registration is required or false otherwise.
+   * @return a datanode command 
    * @throws IOException
    */
-  public boolean gotHeartbeat(DatanodeID nodeID,
-                              long capacity,
-                              long dfsUsed,
-                              long remaining,
-                              int xceiverCount,
-                              int xmitsInProgress,
-                              Object[] xferResults,
-                              Object deleteList[]
-                              ) throws IOException {
+  DatanodeCommand handleHeartbeat(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining,
+      int xceiverCount, int xmitsInProgress) throws IOException {
+    DatanodeCommand cmd = null;
     synchronized (heartbeats) {
       synchronized (datanodeMap) {
-        DatanodeDescriptor nodeinfo;
+        DatanodeDescriptor nodeinfo = null;
         try {
-          nodeinfo = getDatanode(nodeID);
-          if (nodeinfo == null) {
-            return true;
-          }
+          nodeinfo = getDatanode(nodeReg);
         } catch(UnregisteredDatanodeException e) {
-          return true;
+          return new DatanodeCommand(DatanodeProtocol.DNA_REGISTER);
         }
           
         // Check if this datanode should actually be shutdown instead. 
-        if (shouldNodeShutdown(nodeinfo)) {
+        if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
           setDatanodeDead(nodeinfo);
           throw new DisallowedDatanodeException(nodeinfo);
         }
 
-        if (!nodeinfo.isAlive) {
-          return true;
-        } else {
-          updateStats(nodeinfo, false);
-          nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
-          updateStats(nodeinfo, true);
-          //
-          // Extract pending replication work or block invalidation
-          // work from the datanode descriptor
-          //
-          nodeinfo.getReplicationSets(this.maxReplicationStreams - 
-                                      xmitsInProgress, xferResults); 
-          if (xferResults[0] == null) {
-            nodeinfo.getInvalidateBlocks(blockInvalidateLimit, deleteList);
-          }
-          return false;
+        if (nodeinfo == null || !nodeinfo.isAlive) {
+          return new DatanodeCommand(DatanodeProtocol.DNA_REGISTER);
+        }
+
+        updateStats(nodeinfo, false);
+        nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
+        updateStats(nodeinfo, true);
+        
+        //check pending replication
+        if (cmd == null) {
+          cmd = nodeinfo.getReplicationCommand(
+              maxReplicationStreams - xmitsInProgress);
+        }
+        //check block invalidation
+        if (cmd == null) {
+          cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
         }
       }
     }
+
+    // If the datanode has (just) been resolved and we haven't ever processed 
+    // a block report from it yet, ask for one now.
+    if (!blockReportProcessed(nodeReg)) {
+      // If we never processed a block report from this datanode, we shouldn't
+      // have any work for that as well
+      assert(cmd == null);
+      if (isResolved(nodeReg)) {
+        return new DatanodeCommand(DatanodeProtocol.DNA_BLOCKREPORT);
+      }
+    }
+    //check distributed upgrade
+    if (cmd == null) {
+      cmd = getDistributedUpgradeCommand();
+    }
+    return cmd;
   }
 
   private void updateStats(DatanodeDescriptor node, boolean isAdded) {

+ 4 - 50
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -537,7 +537,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       
     return nodeReg;
   }
-    
+
   /**
    * Data node notify the name node that it is alive 
    * Return a block-oriented command for the datanode to execute.
@@ -549,55 +549,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
                                        long remaining,
                                        int xmitsInProgress,
                                        int xceiverCount) throws IOException {
-    Object xferResults[] = new Object[2];
-    xferResults[0] = xferResults[1] = null;
-    Object deleteList[] = new Object[1];
-    deleteList[0] = null; 
-
     verifyRequest(nodeReg);
-    if (namesystem.gotHeartbeat(nodeReg, capacity, dfsUsed, remaining, 
-                                xceiverCount, 
-                                xmitsInProgress,
-                                xferResults,
-                                deleteList)) {
-      // request block report from the datanode
-      assert(xferResults[0] == null && deleteList[0] == null);
-      return new DatanodeCommand(DatanodeProtocol.DNA_REGISTER);
-    }
-    //
-    // If the datanode has (just) been resolved and we haven't ever processed 
-    // a block report from it yet, ask for one now.
-    //
-    if (!namesystem.blockReportProcessed(nodeReg)) {
-      // If we never processed a block report from this datanode, we shouldn't
-      // have any work for that as well
-      assert(xferResults[0] == null && deleteList[0] == null);
-      if (namesystem.isResolved(nodeReg)) {
-        return new DatanodeCommand(DatanodeProtocol.DNA_BLOCKREPORT);
-      }
-    }
-        
-    //
-    // Ask to perform pending transfers, if any
-    //
-    if (xferResults[0] != null) {
-      assert(deleteList[0] == null);
-      return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
-    }
-
-    //
-    // If there are no transfers, check for recently-deleted blocks that
-    // should be removed.  This is not a full-datanode sweep, as is done during
-    // a block report.  This is just a small fast removal of blocks that have
-    // just been removed.
-    //
-    if (deleteList[0] != null) {
-      return new BlockCommand((Block[]) deleteList[0]);
-    }
-    
-    // check whether a distributed upgrade need to be done
-    // and send a request to start one if required
-    return namesystem.getDistributedUpgradeCommand();
+    return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
+        xceiverCount, xmitsInProgress);
   }
 
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
@@ -609,7 +563,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
 
     Block blocksToDelete[] = namesystem.processReport(nodeReg, blist);
     if (blocksToDelete != null && blocksToDelete.length > 0)
-      return new BlockCommand(blocksToDelete);
+      return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocksToDelete);
     if (getFSImage().isUpgradeFinalized())
       return new DatanodeCommand(DatanodeProtocol.DNA_FINALIZE);
     return null;