浏览代码

HDFS-9789. Correctly update DataNode's scheduled block size when writing small EC file. Contributed by Jing Zhao.

Jing Zhao 9 年之前
父节点
当前提交
19adb2bc64

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -421,6 +421,9 @@ Trunk (Unreleased)
     HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with
     HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with
     length less than the first internal block. (jing9)
     length less than the first internal block. (jing9)
 
 
+    HDFS-9789. Correctly update DataNode's scheduled block size when writing
+    small EC file. (jing9)
+
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
 
 
       HDFS-7347. Configurable erasure coding policy for individual files and
       HDFS-7347. Configurable erasure coding policy for individual files and

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -734,14 +734,20 @@ public class BlockManager implements BlockStatsMXBean {
     if(lastBlock.isComplete())
     if(lastBlock.isComplete())
       return false; // already completed (e.g. by syncBlock)
       return false; // already completed (e.g. by syncBlock)
     
     
-    final boolean b = commitBlock(lastBlock, commitBlock);
+    final boolean committed = commitBlock(lastBlock, commitBlock);
+    if (committed && lastBlock.isStriped()) {
+      // update scheduled size for DatanodeStorages that do not store any
+      // internal blocks
+      lastBlock.getUnderConstructionFeature()
+          .updateStorageScheduledSize((BlockInfoStriped) lastBlock);
+    }
     if (hasMinStorage(lastBlock)) {
     if (hasMinStorage(lastBlock)) {
-      if (b) {
+      if (committed) {
         addExpectedReplicasToPending(lastBlock, bc);
         addExpectedReplicasToPending(lastBlock, bc);
       }
       }
       completeBlock(lastBlock, false);
       completeBlock(lastBlock, false);
     }
     }
-    return b;
+    return committed;
   }
   }
 
 
   /**
   /**

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java

@@ -123,6 +123,31 @@ public class BlockUnderConstructionFeature {
     return replicas == null ? 0 : replicas.length;
     return replicas == null ? 0 : replicas.length;
   }
   }
 
 
+  /**
+   * when committing a striped block whose size is less than a stripe, we need
+   * to decrease the scheduled block size of the DataNodes that do not store
+   * any internal block.
+   */
+  void updateStorageScheduledSize(BlockInfoStriped storedBlock) {
+    assert storedBlock.getUnderConstructionFeature() == this;
+    if (replicas == null) {
+      return;
+    }
+    final int dataBlockNum = storedBlock.getDataBlockNum();
+    final int realDataBlockNum = storedBlock.getRealDataBlockNum();
+    if (realDataBlockNum < dataBlockNum) {
+      for (ReplicaUnderConstruction replica : replicas) {
+        int index = BlockIdManager.getBlockIndex(replica);
+        if (index >= realDataBlockNum && index < dataBlockNum) {
+          final DatanodeStorageInfo storage =
+              replica.getExpectedStorageLocation();
+          storage.getDatanodeDescriptor()
+              .decrementBlocksScheduled(storage.getStorageType());
+        }
+      }
+    }
+  }
+
   /**
   /**
    * Return the state of the block under construction.
    * Return the state of the block under construction.
    * @see BlockUCState
    * @see BlockUCState

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java

@@ -37,9 +37,11 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -92,6 +94,39 @@ public class TestAddStripedBlocks {
     }
     }
   }
   }
 
 
+  /**
+   * Check if the scheduled block size on each DN storage is correctly updated
+   */
+  @Test
+  public void testBlockScheduledUpdate() throws Exception {
+    final FSNamesystem fsn = cluster.getNamesystem();
+    final Path foo = new Path("/foo");
+    try (FSDataOutputStream out = dfs.create(foo, true)) {
+      DFSStripedOutputStream sout = (DFSStripedOutputStream) out.getWrappedStream();
+      writeAndFlushStripedOutputStream(sout, DFS_BYTES_PER_CHECKSUM_DEFAULT);
+
+      // make sure the scheduled block size has been updated for each DN storage
+      // in NN
+      final List<DatanodeDescriptor> dnList = new ArrayList<>();
+      fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null, false);
+      for (DatanodeDescriptor dn : dnList) {
+        Assert.assertEquals(1, dn.getBlocksScheduled());
+      }
+    }
+
+    // we have completed the file, force the DN to flush IBR
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.triggerBlockReport(dn);
+    }
+
+    // check the scheduled block size again
+    final List<DatanodeDescriptor> dnList = new ArrayList<>();
+    fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null, false);
+    for (DatanodeDescriptor dn : dnList) {
+      Assert.assertEquals(0, dn.getBlocksScheduled());
+    }
+  }
+
   /**
   /**
    * Make sure the IDs of striped blocks do not conflict
    * Make sure the IDs of striped blocks do not conflict
    */
    */