Browse Source

HDFS-10530. BlockManager reconstruction work scheduling should correctly adhere to EC block placement policy. Contributed by Manoj Govindassamy and Rui Gao.

Andrew Wang 8 years ago
parent
commit
4812518b23

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -4179,7 +4179,7 @@ public class BlockManager implements BlockStatsMXBean {
     BlockPlacementPolicy placementPolicy = placementPolicies
         .getPolicy(blockType);
     int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock)
-        .getRealDataBlockNum() : storedBlock.getReplication();
+        .getRealTotalBlockNum() : storedBlock.getReplication();
     return placementPolicy.verifyBlockPlacement(locs, numReplicas)
         .isPlacementPolicySatisfied();
   }

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -1918,7 +1918,7 @@ public class TestBalancer {
   }
 
   private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {
-    int numOfDatanodes = dataBlocks + parityBlocks + 2;
+    int numOfDatanodes = dataBlocks + parityBlocks + 3;
     int numOfRacks = dataBlocks;
     long capacity = 20 * defaultBlockSize;
     long[] capacities = new long[numOfDatanodes];
@@ -1956,11 +1956,12 @@ public class TestBalancer {
       LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
       StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
 
-      // add one datanode
+      // add datanodes in new rack
       String newRack = "/rack" + (++numOfRacks);
-      cluster.startDataNodes(conf, 1, true, null,
-          new String[]{newRack}, null, new long[]{capacity});
-      totalCapacity += capacity;
+      cluster.startDataNodes(conf, 2, true, null,
+          new String[]{newRack, newRack}, null,
+          new long[]{capacity, capacity});
+      totalCapacity += capacity*2;
       cluster.triggerHeartbeats();
 
       // run balancer and validate results

+ 112 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -55,6 +56,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -68,6 +71,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -125,13 +129,14 @@ public class TestBlockManager {
    * of times trying to trigger the incorrect behavior.
    */
   private static final int NUM_TEST_ITERS = 30;
-  
   private static final int BLOCK_SIZE = 64*1024;
+  private static final Log LOG = LogFactory.getLog(TestBlockManager.class);
 
   private FSNamesystem fsn;
   private BlockManager bm;
   private long mockINodeId;
 
+
   @Before
   public void setupMockCluster() throws IOException {
     Configuration conf = new HdfsConfiguration();
@@ -1287,4 +1292,110 @@ public class TestBlockManager {
         isReplicaCorrupt(Mockito.any(BlockInfo.class),
             Mockito.any(DatanodeDescriptor.class));
   }
+
+  @Test (timeout = 300000)
+  public void testPlacementPolicySatisfied() throws Exception {
+    LOG.info("Starting testPlacementPolicySatisfied.");
+    final String[] initialRacks = new String[]{
+        "/rack0", "/rack1", "/rack2", "/rack3", "/rack4", "/rack5"};
+    final String[] initialHosts = new String[]{
+        "host0", "host1", "host2", "host3", "host4", "host5"};
+    final int numDataBlocks = StripedFileTestUtil.getDefaultECPolicy()
+        .getNumDataUnits();
+    final int numParityBlocks = StripedFileTestUtil.getDefaultECPolicy()
+        .getNumParityUnits();
+    final long blockSize = 64 * 1024;
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+        StripedFileTestUtil.getDefaultECPolicy().getName());
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .racks(initialRacks)
+          .hosts(initialHosts)
+          .numDataNodes(initialRacks.length)
+          .build();
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path ecDir = new Path("/ec");
+      final Path testFileUnsatisfied = new Path(ecDir, "test1");
+      final Path testFileSatisfied = new Path(ecDir, "test2");
+      cluster.getFileSystem().getClient().mkdirs(ecDir.toString(), null, true);
+      cluster.getFileSystem().getClient()
+          .setErasureCodingPolicy(ecDir.toString(),
+              StripedFileTestUtil.getDefaultECPolicy().getName());
+      long fileLen = blockSize * numDataBlocks;
+
+      // Create a file to be stored in 6 racks.
+      DFSTestUtil.createFile(dfs, testFileUnsatisfied, fileLen, (short) 1, 1);
+      // Block placement policy should be satisfied as rack count
+      // is less than numDataBlocks + numParityBlocks.
+      verifyPlacementPolicy(cluster, testFileUnsatisfied, true);
+
+      LOG.info("Adding 3 new hosts in the existing racks.");
+      cluster.startDataNodes(conf, 3, true, null,
+          new String[]{"/rack3", "/rack4", "/rack5"},
+          new String[]{"host3-2", "host4-2", "host5-2"}, null);
+      cluster.triggerHeartbeats();
+
+      LOG.info("Waiting for EC reconstruction to complete.");
+      DFSTestUtil.waitForReplication(dfs, testFileUnsatisfied,
+          (short)(numDataBlocks + numParityBlocks), 30 * 1000);
+      // Block placement policy should still be satisfied
+      // as there are only 6 racks.
+      verifyPlacementPolicy(cluster, testFileUnsatisfied, true);
+
+      LOG.info("Adding 3 new hosts in 3 new racks.");
+      cluster.startDataNodes(conf, 3, true, null,
+          new String[]{"/rack6", "/rack7", "/rack8"},
+          new String[]{"host6", "host7", "host8"},
+          null);
+      cluster.triggerHeartbeats();
+      // Addition of new racks can make the existing EC files block
+      // placements unsatisfied and there is NO automatic block
+      // reconstruction for this yet.
+      // TODO:
+      //  Verify for block placement satisfied once the automatic
+      //  block reconstruction is implemented.
+      verifyPlacementPolicy(cluster, testFileUnsatisfied, false);
+
+      // Create a new file
+      DFSTestUtil.createFile(dfs, testFileSatisfied, fileLen, (short) 1, 1);
+      // The new file should be rightly placed on all 9 racks
+      // and the block placement policy should be satisfied.
+      verifyPlacementPolicy(cluster, testFileUnsatisfied, false);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void verifyPlacementPolicy(final MiniDFSCluster cluster,
+      final Path file, boolean isBlockPlacementSatisfied) throws IOException {
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    LocatedBlock lb = DFSTestUtil.getAllBlocks(dfs, file).get(0);
+    BlockInfo blockInfo =
+        blockManager.getStoredBlock(lb.getBlock().getLocalBlock());
+    Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos();
+    LOG.info("Block " + blockInfo + " storages: ");
+    while (itr.hasNext()) {
+      DatanodeStorageInfo dn = itr.next();
+      LOG.info(" Rack: " + dn.getDatanodeDescriptor().getNetworkLocation()
+          + ", DataNode: " + dn.getDatanodeDescriptor().getXferAddr());
+    }
+    if (isBlockPlacementSatisfied) {
+      assertTrue("Block group of " + file + "should be placement" +
+              " policy satisfied, currently!",
+          blockManager.isPlacementPolicySatisfied(blockInfo));
+    } else {
+      assertFalse("Block group of " + file + " should be placement" +
+              " policy unsatisfied, currently!",
+          blockManager.isPlacementPolicySatisfied(blockInfo));
+    }
+  }
+
 }