Browse Source

HDFS-10810. setreplication removing block from underconstrcution temporarily. Contributed by Brahma Reddy Battula

(cherry picked from commit d963ecb9185b8fcfba86bec8420a8daf7140a071)
Mingliang Liu 8 years ago
parent
commit
11ea5a59c2

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -3558,13 +3558,15 @@ public class BlockManager implements BlockStatsMXBean {
         return;
       }
       NumberReplicas repl = countNodes(block);
+      int pendingNum = pendingReplications.getNumReplicas(block);
       int curExpectedReplicas = getReplication(block);
-      if (isNeededReplication(block, repl.liveReplicas())) {
-        neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
-            repl.decommissionedAndDecommissioning(), curExpectedReplicas,
-            curReplicasDelta, expectedReplicasDelta);
+      if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
+                    curExpectedReplicas)) {
+        neededReplications.update(block, repl.liveReplicas() + pendingNum,
+            repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+            curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
       } else {
-        int oldReplicas = repl.liveReplicas()-curReplicasDelta;
+        int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
         int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
         neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
             repl.decommissionedAndDecommissioning(), oldExpectedReplicas);

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import static org.junit.Assert.assertEquals;
@@ -38,6 +39,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
@@ -230,6 +232,65 @@ public class TestFileCorruption {
 
   }
 
+  @Test
+  public void testSetReplicationWhenBatchIBR() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
+        30000);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
+        1);
+    DistributedFileSystem dfs;
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build()) {
+      final int bufferSize = 1024; // 1024 Bytes each time
+      byte[] outBuffer = new byte[bufferSize];
+      dfs = cluster.getFileSystem();
+      String fileName = "/testSetRep1";
+      Path filePath = new Path(fileName);
+      FSDataOutputStream out = dfs.create(filePath);
+      out.write(outBuffer, 0, bufferSize);
+      out.close();
+      //sending the FBR to Delay next IBR
+      cluster.triggerBlockReports();
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            cluster.triggerBlockReports();
+            if (cluster.getNamesystem().getBlocksTotal() == 1) {
+              return true;
+            }
+          } catch (Exception e) {
+            // Ignore the exception
+          }
+          return false;
+        }
+      }, 10, 3000);
+      fileName = "/testSetRep2";
+      filePath = new Path(fileName);
+      out = dfs.create(filePath);
+      out.write(outBuffer, 0, bufferSize);
+      out.close();
+      dfs.setReplication(filePath, (short) 10);
+      // underreplicated Blocks should be one after setrep
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          try {
+            return cluster.getNamesystem().getBlockManager()
+                .getUnderReplicatedBlocksCount() == 1;
+          } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+          }
+        }
+      }, 10, 3000);
+      assertEquals(0,
+          cluster.getNamesystem().getBlockManager().getMissingBlocksCount());
+    }
+  }
+
   private void markAllBlocksAsCorrupt(BlockManager bm,
                                       ExtendedBlock blk) throws IOException {
     for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {