Browse Source

HADOOP-923. Move replication computation to a separate thread, to improve heartbeat processing time. Contributed by Dhruba.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@506778 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
ed441b01f6

+ 4 - 0
CHANGES.txt

@@ -42,6 +42,10 @@ Trunk (unreleased changes)
 12. HADOOP-1010.  Add Reporter.NULL, a Reporter implementation that
     does nothing.  (Runping Qi via cutting)
 
+13. HADOOP-923.  In HDFS NameNode, move replication computation to a
+    separate thread, to improve heartbeat processing time.
+    (Dhruba Borthakur via cutting) 
+
 
 Release 0.11.1 - 2007-02-09
 

+ 125 - 0
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -42,9 +42,19 @@ public class DatanodeDescriptor extends DatanodeInfo {
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
+  //
+  // List of blocks to be replicated by this datanode
+  // Also, a list of datanodes per block to indicate the target
+  // datanode of this replication.
+  //
+  List<Block> replicateBlocks;
+  List<DatanodeDescriptor[]> replicateTargetSets;
+  List<Block> invalidateBlocks;
+  
   /** Default constructor */
   public DatanodeDescriptor() {
     super();
+    initWorkLists();
   }
   
   /** DatanodeDescriptor constructor
@@ -76,6 +86,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
                       int xceiverCount ) {
     super( nodeID );
     updateHeartbeat(capacity, remaining, xceiverCount);
+    initWorkLists();
   }
 
   /** DatanodeDescriptor constructor
@@ -93,6 +104,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
                               int xceiverCount ) {
     super( nodeID, networkLocation );
     updateHeartbeat( capacity, remaining, xceiverCount);
+    initWorkLists();
+  }
+
+  /*
+   * initialize list of blocks that store work for the datanodes
+   */
+  private void initWorkLists() {
+    replicateBlocks = new ArrayList<Block>();
+    replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
+    invalidateBlocks = new ArrayList<Block>();
   }
 
   /**
@@ -137,4 +158,108 @@ public class DatanodeDescriptor extends DatanodeInfo {
   Iterator<Block> getBlockIterator() {
     return blocks.iterator();
   }
+
+  /*
+   * Store block replication work.
+   */
+  void addBlocksToBeReplicated(Block[] blocklist, 
+                               DatanodeDescriptor[][] targets) {
+    assert(blocklist != null && targets != null);
+    assert(blocklist.length > 0 && targets.length > 0);
+    synchronized (replicateBlocks) {
+      assert(blocklist.length == targets.length);
+      for (int i = 0; i < blocklist.length; i++) {
+        replicateBlocks.add(blocklist[i]);
+        replicateTargetSets.add(targets[i]);
+      }
+    }
+  }
+
+  /*
+   * Store block invalidation work.
+   */
+  void addBlocksToBeInvalidated(Block[] blocklist) {
+    assert(blocklist != null && blocklist.length > 0);
+    synchronized (invalidateBlocks) {
+      for (int i = 0; i < blocklist.length; i++) {
+        invalidateBlocks.add(blocklist[i]);
+      }
+    }
+  }
+
+  /*
+   * The number of work items that are pending to be replicated
+   */
+  int getNumberOfBlocksToBeReplicated() {
+    synchronized (replicateBlocks) {
+      return replicateBlocks.size();
+    }
+  }
+
+  /*
+   * The number of block invalidattion items that are pending to 
+   * be sent to the datanode
+   */
+  int getNumberOfBlocksToBeInvalidated() {
+    synchronized (invalidateBlocks) {
+      return invalidateBlocks.size();
+    }
+  }
+
+  /**
+   * Remove the specified number of target sets
+   */
+  void getReplicationSets(int maxNumTransfers, Object[] xferResults) {
+    assert(xferResults.length == 2);
+    assert(xferResults[0] == null && xferResults[1] == null);
+
+    synchronized (replicateBlocks) {
+      assert(replicateBlocks.size() == replicateTargetSets.size());
+
+      if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) {
+        return;
+      }
+      int numTransfers = 0;
+      int numBlocks = 0;
+      int i;
+      for (i = 0; i < replicateTargetSets.size() && 
+           numTransfers < maxNumTransfers; i++) {
+        numTransfers += replicateTargetSets.get(i).length;
+      }
+      numBlocks = i;
+      Block[] blocklist = new Block[numBlocks];
+      DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][];
+
+      for (i = 0; i < numBlocks; i++) {
+        blocklist[i] = replicateBlocks.get(0);
+        targets[i] = replicateTargetSets.get(0);
+        replicateBlocks.remove(0);
+        replicateTargetSets.remove(0);
+      }
+      xferResults[0] = blocklist;
+      xferResults[1] = targets;
+      assert(blocklist.length > 0 && targets.length > 0);
+    }
+  }
+
+  /**
+   * Remove the specified number of blocks to be invalidated
+   */
+  void getInvalidateBlocks(int maxblocks, Object[] xferResults) {
+    assert(xferResults[0] == null);
+
+    synchronized (invalidateBlocks) {
+      if (maxblocks <= 0 || invalidateBlocks.size() == 0) {
+        return;
+      }
+      int outnum = Math.min(maxblocks, invalidateBlocks.size());
+      Block[] blocklist = new Block[outnum];
+      for (int i = 0; i < outnum; i++) {
+        blocklist[i] = invalidateBlocks.get(0);
+        invalidateBlocks.remove(0);
+      }
+      assert(blocklist.length > 0);
+      xferResults[0] = blocklist;
+    }
+  }
 }

+ 159 - 10
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -172,6 +172,7 @@ class FSNamesystem implements FSConstants {
     Daemon hbthread = null;   // HeartbeatMonitor thread
     Daemon lmthread = null;   // LeaseMonitor thread
     Daemon smmthread = null;  // SafeModeMonitor thread
+    Daemon replthread = null;  // Replication thread
     boolean fsRunning = true;
     long systemStart = 0;
 
@@ -186,6 +187,10 @@ class FSNamesystem implements FSConstants {
     // heartbeatExpireInterval is how long namenode waits for datanode to report
     // heartbeat
     private long heartbeatExpireInterval;
+    //replicationRecheckInterval is how often namenode checks for new replication work
+    private long replicationRecheckInterval;
+    static int replIndex = 0; // last datanode used for replication work
+    static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
 
     public static FSNamesystem fsNamesystemObject;
     private String localMachine;
@@ -228,6 +233,7 @@ class FSNamesystem implements FSConstants {
         this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
         this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
             10 * heartbeatInterval;
+        this.replicationRecheckInterval = 3 * 1000; //  3 second
 
         this.localMachine = hostname;
         this.port = port;
@@ -237,8 +243,10 @@ class FSNamesystem implements FSConstants {
         setBlockTotal();
         this.hbthread = new Daemon(new HeartbeatMonitor());
         this.lmthread = new Daemon(new LeaseMonitor());
+        this.replthread = new Daemon(new ReplicationMonitor());
         hbthread.start();
         lmthread.start();
+        replthread.start();
         this.systemStart = now();
         this.startTime = new Date(systemStart); 
 
@@ -280,6 +288,7 @@ class FSNamesystem implements FSConstants {
         try {
             infoServer.stop();
             hbthread.join(3000);
+            replthread.join(3000);
         } catch (InterruptedException ie) {
         } finally {
           // using finally to ensure we also wait for lease daemon
@@ -435,6 +444,14 @@ class FSNamesystem implements FSConstants {
             int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
             int curPri = getPriority(block, curReplicas, curExpectedReplicas);
             int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+            NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
+                               block +
+                               " curReplicas " + curReplicas +
+                               " curExpectedReplicas " + curExpectedReplicas +
+                               " oldReplicas " + oldReplicas +
+                               " oldExpectedReplicas  " + oldExpectedReplicas +
+                               " curPri  " + curPri +
+                               " oldPri  " + oldPri);
             if( oldPri != LEVEL && oldPri != curPri ) {
                 remove(block, oldPri);
             }
@@ -1575,7 +1592,10 @@ class FSNamesystem implements FSConstants {
     public boolean gotHeartbeat( DatanodeID nodeID,
                                  long capacity, 
                                  long remaining,
-                                 int xceiverCount
+                                 int xceiverCount,
+                                 int xmitsInProgress,
+                                 Object[] xferResults,
+                                 Object deleteList[]
                                  ) throws IOException {
       synchronized (heartbeats) {
         synchronized (datanodeMap) {
@@ -1595,6 +1615,16 @@ class FSNamesystem implements FSConstants {
               updateStats(nodeinfo, false);
               nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
               updateStats(nodeinfo, true);
+              //
+              // Extract pending replication work or block invalidation
+              // work from the datanode descriptor
+              //
+              nodeinfo.getReplicationSets(this.maxReplicationStreams - 
+                                          xmitsInProgress, xferResults); 
+              if (xferResults[0] == null) {
+                nodeinfo.getInvalidateBlocks(FSConstants.BLOCK_INVALIDATE_CHUNK,
+                                             deleteList);
+              }
               return false;
           }
         }
@@ -1633,6 +1663,130 @@ class FSNamesystem implements FSConstants {
         }
     }
 
+    /**
+     * Periodically calls computeReplicationWork().
+     */
+    class ReplicationMonitor implements Runnable {
+      public void run() {
+        while (fsRunning) {
+          try {
+            computeDatanodeWork();
+            Thread.sleep(replicationRecheckInterval);
+          } catch (InterruptedException ie) {
+          } catch (IOException ie) {
+            LOG.warn("ReplicationMonitor thread received exception. " + ie);
+          }
+        }
+      }
+    }
+
+    /**
+     * Look at a few datanodes and compute any replication work that 
+     * can be scheduled on them. The datanode will be infomed of this
+     * work at the next heartbeat.
+     */
+    void computeDatanodeWork() throws IOException {
+      int numiter = 0;
+      int foundwork = 0;
+      int hsize = 0;
+
+      while (true) {
+        DatanodeDescriptor node = null;
+
+        //
+        // pick the datanode that was the last one in the
+        // previous invocation of this method.
+        //
+        synchronized (heartbeats) {
+          hsize = heartbeats.size();
+          if (numiter++ >= hsize) {
+            break;
+          }
+          if (replIndex >= hsize) {
+            replIndex = 0;
+          }
+          node = heartbeats.get(replIndex);
+          replIndex++;
+        }
+
+        //
+        // Is there replication work to be computed for this datanode?
+        //
+        int precomputed = node.getNumberOfBlocksToBeReplicated();
+        int needed = this.maxReplicationStreams - precomputed;
+        boolean doReplication = false;
+        boolean doInvalidation = false;
+        if (needed > 0) {
+          //
+          // Compute replication work and store work into the datanode
+          //
+          Object replsets[] = pendingTransfers(node, needed);
+          if (replsets != null) {
+            doReplication = true;
+            addBlocksToBeReplicated(node, (Block[])replsets[0], 
+                                   (DatanodeDescriptor[][])replsets[1]);
+          }
+        }
+        if (!doReplication) {
+          //
+          // Determine if block deletion is pending for this datanode
+          //
+          Block blocklist[] = blocksToInvalidate(node);
+          if (blocklist != null) {
+            doInvalidation = true;
+            addBlocksToBeInvalidated(node, blocklist);
+          }
+        }
+        if (doReplication || doInvalidation) {
+          //
+          // If we have already computed work for a predefined
+          // number of datanodes in this iteration, then relax
+          //
+          if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
+            break;
+          }
+          foundwork++;
+        } else {
+          //
+          // See if the decommissioned node has finished moving all
+          // its datablocks to another replica. This is a loose
+          // heuristic to determine when a decommission is really over.
+          //
+          checkDecommissionState(node);
+        }
+      }
+    }
+
+    /**
+     * Add more replication work for this datanode.
+     */
+    synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, 
+                                 Block[] blocklist,
+                                 DatanodeDescriptor[][] targets) 
+                                 throws IOException {
+      //
+      // Find the datanode with the FSNamesystem lock held.
+      //
+      DatanodeDescriptor n = getDatanode(node);
+      if (n != null) {
+        n.addBlocksToBeReplicated(blocklist, targets);
+      }
+    }
+
+    /**
+     * Add more block invalidation work for this datanode.
+     */
+    synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node, 
+                                 Block[] blocklist) throws IOException {
+      //
+      // Find the datanode with the FSNamesystem lock held.
+      //
+      DatanodeDescriptor n = getDatanode(node);
+      if (n != null) {
+        n.addBlocksToBeInvalidated(blocklist);
+      }
+    }
+
     /**
      * remove a datanode descriptor
      * @param nodeID datanode ID
@@ -2405,7 +2559,7 @@ class FSNamesystem implements FSConstants {
      *
      */
     public synchronized Object[] pendingTransfers(DatanodeID srcNode,
-                                                  int xmitsInProgress) {
+                                                  int needed) {
     // Ask datanodes to perform block replication  
     // only if safe mode is off.
     if( isInSafeMode() )
@@ -2413,7 +2567,6 @@ class FSNamesystem implements FSConstants {
     
     synchronized (neededReplications) {
       Object results[] = null;
-      int scheduledXfers = 0;
 
       if (neededReplications.size() > 0) {
         //
@@ -2426,13 +2579,9 @@ class FSNamesystem implements FSConstants {
         List<DatanodeDescriptor[]> replicateTargetSets;
         replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
         for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
-          //
-          // We can only reply with 'maxXfers' or fewer blocks
-          //
-          if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {
+          if (needed <= 0) {
             break;
           }
-
           Block block = it.next();
           long blockSize = block.getNumBytes();
           FSDirectory.INode fileINode = dir.getFileByBlock(block);
@@ -2453,7 +2602,7 @@ class FSNamesystem implements FSConstants {
               int numCurrentReplica = nodes.size();
               DatanodeDescriptor targets[] = replicator.chooseTarget(
                   Math.min( fileINode.getReplication() - numCurrentReplica,
-                            this.maxReplicationStreams - xmitsInProgress),
+                            needed),
                   datanodeMap.get(srcNode.getStorageID()),
                   nodes, null, blockSize);
               if (targets.length > 0) {
@@ -2461,7 +2610,7 @@ class FSNamesystem implements FSConstants {
                 replicateBlocks.add(block);
                 numCurrentReplicas.add(new Integer(numCurrentReplica));
                 replicateTargetSets.add(targets);
-                scheduledXfers += targets.length;
+                needed -= targets.length;
               }
             }
           }

+ 15 - 15
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -579,18 +579,27 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
                                       long remaining,
                                       int xmitsInProgress,
                                       int xceiverCount) throws IOException {
+        Object xferResults[] = new Object[2];
+        xferResults[0] = xferResults[1] = null;
+        Object deleteList[] = new Object[1];
+        deleteList[0] = null; 
+
         verifyRequest( nodeReg );
-        if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount )) {
+        if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, 
+                                     xceiverCount, 
+                                     xmitsInProgress,
+                                     xferResults,
+                                     deleteList)) {
           // request block report from the datanode
+          assert(xferResults[0] == null && deleteList[0] == null);
           return new BlockCommand( DataNodeAction.DNA_REGISTER );
         }
         
         //
         // Ask to perform pending transfers, if any
         //
-        Object xferResults[] = namesystem.pendingTransfers( nodeReg,
-                                                            xmitsInProgress );
-        if (xferResults != null) {
+        if (xferResults[0] != null) {
+            assert(deleteList[0] == null);
             return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
         }
 
@@ -600,18 +609,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         // a block report.  This is just a small fast removal of blocks that have
         // just been removed.
         //
-        Block blocks[] = namesystem.blocksToInvalidate( nodeReg );
-        if (blocks != null) {
-            return new BlockCommand(blocks);
+        if (deleteList[0] != null) {
+            return new BlockCommand((Block[]) deleteList[0]);
         }
-        //
-        // See if the decommissioned node has finished moving all
-        // its datablocks to another replica. This is a loose
-        // heuristic to determine when a decommission is really over.
-        // We can probbaly do it in a seperate thread rather than making
-        // the heartbeat thread do this.
-        //
-        namesystem.checkDecommissionState(nodeReg);
         return null;
     }