|
@@ -108,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
|
|
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
@@ -2651,5 +2653,130 @@ public class TestDistributedFileSystem {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
|
|
|
+ Configuration conf = getTestConfiguration();
|
|
|
+ conf.setClass(
|
|
|
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ BlockPlacementPolicyRackFaultTolerant.class,
|
|
|
+ BlockPlacementPolicy.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
|
|
+ false);
|
|
|
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
|
|
+ MIN_REPLICATION, 2);
|
|
|
+ // 3 racks & 3 nodes. 1 per rack
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
|
|
+ .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ // kill one DN, so only 2 racks stays with active DN
|
|
|
+ cluster.stopDataNode(0);
|
|
|
+ // create a file with replication 3, for rack fault tolerant BPP,
|
|
|
+ // it should allocate nodes in all 3 racks.
|
|
|
+ DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible()
|
|
|
+ throws Exception {
|
|
|
+ Configuration conf = getTestConfiguration();
|
|
|
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class);
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false);
|
|
|
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 3);
|
|
|
+ // 3 racks & 3 nodes. 1 per rack
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
|
|
+ .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ // kill one DN, so only 2 racks stays with active DN
|
|
|
+ cluster.stopDataNode(0);
|
|
|
+ LambdaTestUtils.intercept(IOException.class,
|
|
|
+ () ->
|
|
|
+ DFSTestUtil.createFile(fs, new Path("/testFile"),
|
|
|
+ 1024L, (short) 3, 1024L));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
|
|
|
+ Configuration conf = getTestConfiguration();
|
|
|
+ conf.setClass(
|
|
|
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ BlockPlacementPolicyRackFaultTolerant.class,
|
|
|
+ BlockPlacementPolicy.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
|
|
+ false);
|
|
|
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
|
|
+ MIN_REPLICATION, 1);
|
|
|
+ // 3 racks & 3 nodes. 1 per rack
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
|
|
+ .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ // kill 2 DN, so only 1 racks stays with active DN
|
|
|
+ cluster.stopDataNode(0);
|
|
|
+ cluster.stopDataNode(1);
|
|
|
+ // create a file with replication 3, for rack fault tolerant BPP,
|
|
|
+ // it should allocate nodes in all 3 racks.
|
|
|
+ DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible()
|
|
|
+ throws Exception {
|
|
|
+ Configuration conf = getTestConfiguration();
|
|
|
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ BlockPlacementPolicyRackFaultTolerant.class,
|
|
|
+ BlockPlacementPolicy.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
|
|
+ false);
|
|
|
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
|
|
+ MIN_REPLICATION, 2);
|
|
|
+ // 3 racks & 3 nodes. 1 per rack
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
|
|
+ .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ // kill 2 DN, so only 1 rack stays with active DN
|
|
|
+ cluster.stopDataNode(0);
|
|
|
+ cluster.stopDataNode(1);
|
|
|
+ LambdaTestUtils.intercept(IOException.class,
|
|
|
+ () ->
|
|
|
+ DFSTestUtil.createFile(fs, new Path("/testFile"),
|
|
|
+ 1024L, (short) 3, 1024L));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAllRackFailureDuringPipelineSetup() throws Exception {
|
|
|
+ Configuration conf = getTestConfiguration();
|
|
|
+ conf.setClass(
|
|
|
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
+ BlockPlacementPolicyRackFaultTolerant.class,
|
|
|
+ BlockPlacementPolicy.class);
|
|
|
+ conf.setBoolean(
|
|
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
|
|
+ false);
|
|
|
+ // 3 racks & 3 nodes. 1 per rack
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
|
|
+ .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ // shutdown all DNs
|
|
|
+ cluster.shutdownDataNodes();
|
|
|
+ // create a file with replication 3, for rack fault tolerant BPP,
|
|
|
+ // it should allocate nodes in all 3 rack but fail because no DNs are present.
|
|
|
+ LambdaTestUtils.intercept(IOException.class,
|
|
|
+ () ->
|
|
|
+ DFSTestUtil.createFile(fs, new Path("/testFile"),
|
|
|
+ 1024L, (short) 3, 1024L));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
-}
|
|
|
+}
|