Browse Source

HADOOP-3707. NameNode keeps a count of number of blocks scheduled
to be written to a datanode and uses it to avoid allocating more
blocks than a datanode can hold. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@677380 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 năm trước cách đây
mục cha
commit
644357371d

+ 4 - 0
CHANGES.txt

@@ -762,6 +762,10 @@ Release 0.17.2 - Unreleased
 
   BUG FIXES
 
+    HADOOP-3707. NameNode keeps a count of number of blocks scheduled
+    to be written to a datanode and uses it to avoid allocating more
+    blocks than a datanode can hold. (rangadi)
+
     HADOOP-3760. Fix a bug with HDFS file close() mistakenly introduced
     by HADOOP-3681. (Lohit Vijayarenu via rangadi)
 

+ 50 - 0
src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -93,6 +93,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   boolean processedBlockReport = false;
   
+  /* Variables for maintaning number of blocks scheduled to be written to
+   * this datanode. This count is approximate and might be slightly higger
+   * in case of errors (e.g. datanode does not report if an error occurs 
+   * while writing the block).
+   */
+  private int currApproxBlocksScheduled = 0;
+  private int prevApproxBlocksScheduled = 0;
+  private long lastBlocksScheduledRollTime = 0;
+  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
+  
   /** Default constructor */
   public DatanodeDescriptor() {}
   
@@ -211,6 +221,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.remaining = remaining;
     this.lastUpdate = System.currentTimeMillis();
     this.xceiverCount = xceiverCount;
+    rollBlocksScheduled(lastUpdate);
   }
 
   /**
@@ -396,4 +407,43 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.hostName = Text.readString(in);
     setAdminState(WritableUtils.readEnum(in, AdminStates.class));
   }
+  
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
+   * to this datanode.
+   */
+  public int getBlocksScheduled() {
+    return currApproxBlocksScheduled + prevApproxBlocksScheduled;
+  }
+  
+  /**
+   * Increments counter for number of blocks scheduled. 
+   */
+  void incBlocksScheduled() {
+    currApproxBlocksScheduled++;
+  }
+  
+  /**
+   * Decrements counter for number of blocks scheduled.
+   */
+  void decBlocksScheduled() {
+    if (prevApproxBlocksScheduled > 0) {
+      prevApproxBlocksScheduled--;
+    } else if (currApproxBlocksScheduled > 0) {
+      currApproxBlocksScheduled--;
+    } 
+    // its ok if both counters are zero.
+  }
+  
+  /**
+   * Adjusts curr and prev number of blocks scheduled every few minutes.
+   */
+  private void rollBlocksScheduled(long now) {
+    if ((now - lastBlocksScheduledRollTime) > 
+        BLOCKS_SCHEDULED_ROLL_INTERVAL) {
+      prevApproxBlocksScheduled = currApproxBlocksScheduled;
+      currApproxBlocksScheduled = 0;
+      lastBlocksScheduledRollTime = now;
+    }
+  }
 }

+ 14 - 0
src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1130,6 +1130,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       // allocate new block record block locations in INode.
       newBlock = allocateBlock(src, pendingFile);
       pendingFile.setTargets(targets);
+      
+      for (DatanodeDescriptor dn : targets) {
+        dn.incBlocksScheduled();
+      }      
     }
         
     // Create next block
@@ -2334,6 +2338,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         srcNode.addBlockToBeReplicated(block, targets);
         scheduledReplicationCount++;
 
+        for (DatanodeDescriptor dn : targets) {
+          dn.incBlocksScheduled();
+        }
+        
         // Move the block-replication into a "pending" state.
         // The reason we use 'pending' is so we can retry
         // replications that fail after an appropriate amount of time.
@@ -2500,6 +2508,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                  getReplication(timedOutItems[i]));
         }
       }
+      /* If we know the the target datanodes where the replication timedout,
+       * we could invoke decBlocksScheduled() on it. Its ok for now.
+       */
     }
   }
 
@@ -3119,6 +3130,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       throw new DisallowedDatanodeException(node);
     }
 
+    // decrement number of blocks scheduled to this datanode.
+    node.decBlocksScheduled();
+    
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
     if(delHint!=null && delHint.length()!=0) {

+ 3 - 1
src/hdfs/org/apache/hadoop/dfs/ReplicationTargetChooser.java

@@ -398,8 +398,10 @@ class ReplicationTargetChooser {
       return false;
     }
 
+    long remaining = node.getRemaining() - 
+                     (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
-    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
       logr.debug("Node "+NodeBase.getPath(node)+
                 " is not chosen because the node does not have enough space");
       return false;

+ 63 - 0
src/test/org/apache/hadoop/dfs/TestBlocksScheduledCounter.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+/**
+ * This class tests DatanodeDescriptor.getBlocksScheduled() at the
+ * NameNode. This counter is supposed to keep track of blocks currently
+ * scheduled to a datanode.
+ */
+public class TestBlocksScheduledCounter extends TestCase {
+
+  public void testBlocksScheduledCounter() throws IOException {
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, 
+                                                true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    
+    //open a file an write a few bytes:
+    FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter"));
+    for (int i=0; i<1024; i++) {
+      out.write(i);
+    }
+    // flush to make sure a block is allocated.
+    ((DFSOutputStream)(out.getWrappedStream())).sync();
+    
+    ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
+    cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList);
+    DatanodeDescriptor dn = dnList.get(0);
+    
+    assertEquals(1, dn.getBlocksScheduled());
+   
+    // close the file and the counter should go to zero.
+    out.close();   
+    assertEquals(0, dn.getBlocksScheduled());
+  }
+}

+ 2 - 1
src/webapps/dfs/dfshealth.jsp

@@ -106,7 +106,8 @@
 	      JspHelper.percentageGraph( (int)Double.parseDouble(percentUsed) , 100) +
 	      "<td class=\"size\">" +
               FsShell.limitDecimalTo2(d.getRemaining()*1.0/diskBytes) +
-          "<td class=\"blocks\">" + d.numBlocks() + "\n");
+              "<td title=" + "\"blocks scheduled : " + d.getBlocksScheduled() + 
+              "\" class=\"blocks\">" + d.numBlocks() + "\n");
   }
 
   public void generateDFSHealthReport(JspWriter out,