Ver código fonte

HADOOP-3707. NameNode keeps a count of number of blocks scheduled
to be written to a datanode and uses it to avoid allocating more
blocks than a datanode can hold. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@676668 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 anos atrás
pai
commit
ed29940dd4

+ 4 - 0
CHANGES.txt

@@ -4,6 +4,10 @@ Release 0.17.2 - Unreleased
 
   BUG FIXES
 
+    HADOOP-3707. NameNode keeps a count of number of blocks scheduled 
+    to be written to a datanode and uses it to avoid allocating more
+    blocks than a datanode can hold. (rangadi)
+
     HADOOP-3681. DFSClient can get into an infinite loop while closing
     a file if there are some errors. (Lohit Vijayarenu via rangadi)
 

+ 50 - 0
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -49,6 +49,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
   Set<Block> invalidateBlocks;
   boolean processedBlockReport = false;
   
+  /* Variables for maintaning number of blocks scheduled to be written to
+   * this datanode. This count is approximate and might be slightly higger
+   * in case of errors (e.g. datanode does not report if an error occurs 
+   * while writing the block).
+   */
+  private int currApproxBlocksScheduled = 0;
+  private int prevApproxBlocksScheduled = 0;
+  private long lastBlocksScheduledRollTime = 0;
+  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 300 * 1000; // 5 min
+  
   /** Default constructor */
   public DatanodeDescriptor() {
     super();
@@ -181,6 +191,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.remaining = remaining;
     this.lastUpdate = System.currentTimeMillis();
     this.xceiverCount = xceiverCount;
+    rollBlocksScheduled(lastUpdate);
   }
 
   /**
@@ -367,4 +378,43 @@ public class DatanodeDescriptor extends DatanodeInfo {
       toRemove.add(it.next());
     this.removeBlock(delimiter);
   }
+  
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
+   * to this datanode.
+   */
+  int getBlocksScheduled() {
+    return currApproxBlocksScheduled + prevApproxBlocksScheduled;
+  }
+  
+  /**
+   * Increments counter for number of blocks scheduled. 
+   */
+  void incBlocksScheduled() {
+    currApproxBlocksScheduled++;
+  }
+  
+  /**
+   * Decrements counter for number of blocks scheduled.
+   */
+  void decBlocksScheduled() {
+    if (prevApproxBlocksScheduled > 0) {
+      prevApproxBlocksScheduled--;
+    } else if (currApproxBlocksScheduled > 0) {
+      currApproxBlocksScheduled--;
+    } 
+    // its ok if both counters are zero.
+  }
+  
+  /**
+   * Adjusts curr and prev number of blocks scheduled every few minutes.
+   */
+  private void rollBlocksScheduled(long now) {
+    if ((now - lastBlocksScheduledRollTime) > 
+        BLOCKS_SCHEDULED_ROLL_INTERVAL) {
+      prevApproxBlocksScheduled = currApproxBlocksScheduled;
+      currApproxBlocksScheduled = 0;
+      lastBlocksScheduledRollTime = now;
+    }
+  }
 }

+ 11 - 0
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1147,6 +1147,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
                             minReplication);
     }
 
+    for (DatanodeDescriptor dn : targets) {
+      dn.incBlocksScheduled();
+    }
+    
     // Allocate a new block and record it in the INode. 
     synchronized (this) {
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
@@ -2402,6 +2406,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
         srcNode.addBlockToBeReplicated(block, targets);
         scheduledReplicationCount++;
 
+        for (DatanodeDescriptor dn : targets) {
+          dn.incBlocksScheduled();
+        }
+        
         // Move the block-replication into a "pending" state.
         // The reason we use 'pending' is so we can retry
         // replications that fail after an appropriate amount of time.
@@ -3093,6 +3101,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       throw new DisallowedDatanodeException(node);
     }
 
+    // decrement number of blocks scheduled to this datanode.
+    node.decBlocksScheduled();
+    
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
     if(delHint!=null && delHint.length()!=0) {

+ 3 - 1
src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java

@@ -398,8 +398,10 @@ class ReplicationTargetChooser {
       return false;
     }
 
+    long remaining = node.getRemaining() - 
+                     (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
-    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
       logr.debug("Node "+NodeBase.getPath(node)+
                 " is not chosen because the node does not have enough space");
       return false;

+ 63 - 0
src/test/org/apache/hadoop/dfs/TestBlocksScheduledCounter.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+/**
+ * This class tests DatanodeDescriptor.getBlocksScheduled() at the
+ * NameNode. This counter is supposed to keep track of blocks currently
+ * scheduled to a datanode.
+ */
+public class TestBlocksScheduledCounter extends TestCase {
+
+  public void testBlocksScheduledCounter() throws IOException {
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, 
+                                                true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    
+    //open a file an write a few bytes:
+    FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter"));
+    for (int i=0; i<1024; i++) {
+      out.write(i);
+    }
+    // flush to make sure a block is allocated.
+    ((DFSOutputStream)(out.getWrappedStream())).fsync();
+    
+    ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
+    cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList);
+    DatanodeDescriptor dn = dnList.get(0);
+    
+    assertEquals(1, dn.getBlocksScheduled());
+   
+    // close the file and the counter should go to zero.
+    out.close();   
+    assertEquals(0, dn.getBlocksScheduled());
+  }
+}

+ 2 - 1
src/webapps/dfs/dfshealth.jsp

@@ -106,7 +106,8 @@
 	      JspHelper.percentageGraph( (int)Double.parseDouble(percentUsed) , 100) +
 	      "<td class=\"size\">" +
               FsShell.limitDecimalTo2(d.getRemaining()*1.0/diskBytes) +
-          "<td class=\"blocks\">" + d.numBlocks() + "\n");
+              "<td title=" + "\"blocks scheduled : " + d.getBlocksScheduled() + 
+              "\" class=\"blocks\">" + d.numBlocks() + "\n");
   }
 
   public void generateDFSHealthReport(JspWriter out,