Bläddra i källkod

HDFS-5470. Add back trunk's reportDiff algorithm to the branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1539504 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 11 år sedan
förälder
incheckning
75777f1626

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt

@@ -79,3 +79,5 @@ IMPROVEMENTS:
     HDFS-5439. Fix TestPendingReplication. (Contributed by Junping Du, Arpit
     Agarwal)
 
+    HDFS-5470. Add back trunk's reportDiff algorithm to the branch.
+    (szetszwo)

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java

@@ -323,6 +323,27 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
     return head;
   }
 
+  /**
+   * Remove this block from the list of blocks related to the specified
+   * DatanodeDescriptor. Insert it into the head of the list of blocks.
+   *
+   * @return the new head of the list.
+   */
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
+      int curIndex, int headIndex) {
+    if (head == this) {
+      return this;
+    }
+    BlockInfo next = this.setNext(curIndex, head);
+    BlockInfo prev = this.setPrevious(curIndex, null);
+
+    head.setPrevious(headIndex, this);
+    prev.setNext(prev.findStorageInfo(storage), next);
+    if (next != null)
+      next.setPrevious(next.findStorageInfo(storage), prev);
+    return this;
+  }
+
   /**
    * BlockInfo represents a block that is not being constructed.
    * In order to start modifying the block, the BlockInfo should be converted

+ 18 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1816,13 +1816,15 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    dn.updateStorage(storage);
+    final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
 
-    // add all blocks to remove list
-    for(Iterator<BlockInfo> it = dn.getBlockIterator(storage.getStorageID());
-        it.hasNext(); ) {
-      toRemove.add(it.next());
-    }
+    // place a delimiter in the list which separates blocks 
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = storageInfo.addBlock(delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
 
     if (newReport == null)
       newReport = new BlockListAsLongs();
@@ -1834,10 +1836,18 @@ public class BlockManager {
       BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
-      if (storedBlock != null) {
-        toRemove.remove(storedBlock);
+      // move block to the head of the list
+      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
       }
     }
+
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
+    while(it.hasNext())
+      toRemove.add(it.next());
+    storageInfo.removeBlock(delimiter);
   }
 
   /**

+ 23 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java

@@ -74,13 +74,11 @@ public class DatanodeStorageInfo {
   /**
    * Iterates over the list of blocks belonging to the data-node.
    */
-  static class BlockIterator implements Iterator<BlockInfo> {
+  class BlockIterator implements Iterator<BlockInfo> {
     private BlockInfo current;
-    private DatanodeStorageInfo node;
 
-    BlockIterator(BlockInfo head, DatanodeStorageInfo dn) {
+    BlockIterator(BlockInfo head) {
       this.current = head;
-      this.node = dn;
     }
 
     public boolean hasNext() {
@@ -89,7 +87,7 @@ public class DatanodeStorageInfo {
 
     public BlockInfo next() {
       BlockInfo res = current;
-      current = current.getNext(current.findStorageInfo(node));
+      current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
       return res;
     }
 
@@ -233,7 +231,26 @@ public class DatanodeStorageInfo {
   }
   
   Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(this.blockList, this);
+    return new BlockIterator(blockList);
+
+  }
+
+  /**
+   * Move block to the head of the list of blocks belonging to the data-node.
+   * @return the index of the head of the blockList
+   */
+  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
+    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
+    return curIndex;
+  }
+
+  /**
+   * Used for testing only
+   * @return the head of the blockList
+   */
+  @VisibleForTesting
+  protected BlockInfo getHead(){
+    return blockList;
   }
 
   public void updateState(StorageReport r) {

+ 124 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.junit.Test;
+
+/**
+ * This class provides tests for BlockInfo class, which is used in BlocksMap.
+ * The test covers BlockList.listMoveToHead, used for faster block report
+ * processing in DatanodeDescriptor.reportDiff.
+ */
+
+public class TestBlockInfo {
+
+  private static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.hdfs.TestBlockInfo");
+
+  @Test
+  public void testBlockListMoveToHead() throws Exception {
+    LOG.info("BlockInfo moveToHead tests...");
+
+    final int MAX_BLOCKS = 10;
+
+    DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
+    ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
+    ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
+    int headIndex;
+    int curIndex;
+
+    LOG.info("Building block list...");
+    for (int i = 0; i < MAX_BLOCKS; i++) {
+      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
+      blockInfoList.add(new BlockInfo(blockList.get(i), 3));
+      dd.addBlock(blockInfoList.get(i));
+
+      // index of the datanode should be 0
+      assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
+          .findStorageInfo(dd));
+    }
+
+    // list length should be equal to the number of blocks we inserted
+    LOG.info("Checking list length...");
+    assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
+    Iterator<BlockInfo> it = dd.getBlockIterator();
+    int len = 0;
+    while (it.hasNext()) {
+      it.next();
+      len++;
+    }
+    assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
+
+    headIndex = dd.getHead().findStorageInfo(dd);
+
+    LOG.info("Moving each block to the head of the list...");
+    for (int i = 0; i < MAX_BLOCKS; i++) {
+      curIndex = blockInfoList.get(i).findStorageInfo(dd);
+      headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
+      // the moved element must be at the head of the list
+      assertEquals("Block should be at the head of the list now.",
+          blockInfoList.get(i), dd.getHead());
+    }
+
+    // move head of the list to the head - this should not change the list
+    LOG.info("Moving head to the head...");
+
+    BlockInfo temp = dd.getHead();
+    curIndex = 0;
+    headIndex = 0;
+    dd.moveBlockToHead(temp, curIndex, headIndex);
+    assertEquals(
+        "Moving head to the head of the list shopuld not change the list",
+        temp, dd.getHead());
+
+    // check all elements of the list against the original blockInfoList
+    LOG.info("Checking elements of the list...");
+    temp = dd.getHead();
+    assertNotNull("Head should not be null", temp);
+    int c = MAX_BLOCKS - 1;
+    while (temp != null) {
+      assertEquals("Expected element is not on the list",
+          blockInfoList.get(c--), temp);
+      temp = temp.getNext(0);
+    }
+
+    LOG.info("Moving random blocks to the head of the list...");
+    headIndex = dd.getHead().findStorageInfo(dd);
+    Random rand = new Random();
+    for (int i = 0; i < MAX_BLOCKS; i++) {
+      int j = rand.nextInt(MAX_BLOCKS);
+      curIndex = blockInfoList.get(j).findStorageInfo(dd);
+      headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
+      // the moved element must be at the head of the list
+      assertEquals("Block should be at the head of the list now.",
+          blockInfoList.get(j), dd.getHead());
+    }
+  }
+}