فهرست منبع

HDFS-15359. EC: Allow closing a file with committed blocks. Contributed by Ayush Saxena.

Ayush Saxena 4 سال پیش
والد
کامیت
2326123705

+ 11 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1158,12 +1158,19 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * If IBR is not sent from expected locations yet, add the datanodes to
    * pendingReconstruction in order to keep RedundancyMonitor from scheduling
-   * the block.
+   * the block. In case of erasure coding blocks, adds only in case there
+   * isn't any missing node.
    */
   public void addExpectedReplicasToPending(BlockInfo blk) {
-    if (!blk.isStriped()) {
-      DatanodeStorageInfo[] expectedStorages =
-          blk.getUnderConstructionFeature().getExpectedStorageLocations();
+    boolean addForStriped = false;
+    DatanodeStorageInfo[] expectedStorages =
+        blk.getUnderConstructionFeature().getExpectedStorageLocations();
+    if (blk.isStriped()) {
+      BlockInfoStriped blkStriped = (BlockInfoStriped) blk;
+      addForStriped =
+          blkStriped.getRealTotalBlockNum() == expectedStorages.length;
+    }
+    if (!blk.isStriped() || addForStriped) {
       if (expectedStorages.length - blk.numNodes() > 0) {
         ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
         for (DatanodeStorageInfo storage : expectedStorages) {

+ 16 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -374,17 +374,27 @@ public class INodeFile extends INodeWithAdditionalFields
     if (state == BlockUCState.COMPLETE) {
       return null;
     }
-    if (b.isStriped() || i < blocks.length - numCommittedAllowed) {
+    if (i < blocks.length - numCommittedAllowed) {
       return b + " is " + state + " but not COMPLETE";
     }
     if (state != BlockUCState.COMMITTED) {
       return b + " is " + state + " but neither COMPLETE nor COMMITTED";
     }
-    final int numExpectedLocations
-        = b.getUnderConstructionFeature().getNumExpectedLocations();
-    if (numExpectedLocations <= minReplication) {
-      return b + " is " + state + " but numExpectedLocations = "
-          + numExpectedLocations + " <= minReplication = " + minReplication;
+
+    if (b.isStriped()) {
+      BlockInfoStriped blkStriped = (BlockInfoStriped) b;
+      if (b.getUnderConstructionFeature().getNumExpectedLocations()
+          != blkStriped.getRealTotalBlockNum()) {
+        return b + " is a striped block in " + state + " with less then "
+            + "required number of blocks.";
+      }
+    } else {
+      final int numExpectedLocations =
+          b.getUnderConstructionFeature().getNumExpectedLocations();
+      if (numExpectedLocations <= minReplication) {
+        return b + " is " + state + " but numExpectedLocations = "
+            + numExpectedLocations + " <= minReplication = " + minReplication;
+      }
     }
     return null;
   }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4903,7 +4903,9 @@
   <description>
     Normally a file can only be closed with all its blocks are committed.
     When this value is set to a positive integer N, a file can be closed
-    when N blocks are committed and the rest complete.
+    when N blocks are committed and the rest complete. In case of Erasure Coded
+    blocks, the committed block shall be allowed only when the block group is
+    complete. i.e no missing/lost block in the blockgroup.
   </description>
 </property>
 

+ 39 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
@@ -2105,4 +2107,41 @@ public class TestDistributedFileSystem {
       assertFalse(result.isSupported());
     }
   }
+
+  @Test
+  public void testECCloseCommittedBlock() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build()) {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      Path dir = new Path("/dir");
+      dfs.mkdirs(dir);
+      dfs.enableErasureCodingPolicy("XOR-2-1-1024k");
+      dfs.setErasureCodingPolicy(dir, "XOR-2-1-1024k");
+
+      try (FSDataOutputStream str = dfs.create(new Path("/dir/file"));) {
+        for (int i = 0; i < 1024 * 1024 * 4; i++) {
+          str.write(i);
+        }
+        DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(0));
+        DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(1));
+      }
+      DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(0));
+      DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(1));
+
+      // Check if the blockgroup isn't complete then file close shouldn't be
+      // success with block in committed state.
+      cluster.getDataNodes().get(0).shutdown();
+      FSDataOutputStream str = dfs.create(new Path("/dir/file1"));
+
+      for (int i = 0; i < 1024 * 1024 * 4; i++) {
+        str.write(i);
+      }
+      DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(1));
+      DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(2));
+      LambdaTestUtils.intercept(IOException.class, "", () -> str.close());
+    }
+  }
 }