|
@@ -163,11 +163,13 @@ public class TestDatanodeBlockScanner extends TestCase {
|
|
|
|
|
|
public void testBlockCorruptionPolicy() throws IOException {
|
|
|
Configuration conf = new Configuration();
|
|
|
+ conf.setLong("dfs.blockreport.intervalMsec", 1000L);
|
|
|
Random random = new Random();
|
|
|
FileSystem fs = null;
|
|
|
DFSClient dfsClient = null;
|
|
|
LocatedBlocks blocks = null;
|
|
|
int blockCount = 0;
|
|
|
+ int rand = random.nextInt(3);
|
|
|
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
cluster.waitActive();
|
|
@@ -178,27 +180,35 @@ public class TestDatanodeBlockScanner extends TestCase {
|
|
|
|
|
|
dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
|
|
cluster.getNameNodePort()), conf);
|
|
|
- blocks = dfsClient.namenode.
|
|
|
+ do {
|
|
|
+ blocks = dfsClient.namenode.
|
|
|
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
|
|
|
- blockCount = blocks.get(0).getLocations().length;
|
|
|
- assertTrue(blockCount == 3);
|
|
|
+ blockCount = blocks.get(0).getLocations().length;
|
|
|
+ try {
|
|
|
+ LOG.info("Looping until expected blockCount of 3 is received");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ignore) {
|
|
|
+ }
|
|
|
+ } while (blockCount != 3);
|
|
|
assertTrue(blocks.get(0).isCorrupt() == false);
|
|
|
|
|
|
// Corrupt random replica of block
|
|
|
- corruptReplica(block, random.nextInt(3));
|
|
|
- cluster.shutdown();
|
|
|
+ corruptReplica(block, rand);
|
|
|
+
|
|
|
+ // Restart the datanode hoping the corrupt block to be reported
|
|
|
+ cluster.restartDataNode(rand);
|
|
|
|
|
|
- // Restart the cluster hoping the corrupt block to be reported
|
|
|
// We have 2 good replicas and block is not corrupt
|
|
|
- cluster = new MiniDFSCluster(conf, 3, false, null);
|
|
|
- cluster.waitActive();
|
|
|
- fs = cluster.getFileSystem();
|
|
|
- dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
|
|
- cluster.getNameNodePort()), conf);
|
|
|
- blocks = dfsClient.namenode.
|
|
|
+ do {
|
|
|
+ blocks = dfsClient.namenode.
|
|
|
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
|
|
|
- blockCount = blocks.get(0).getLocations().length;
|
|
|
- assertTrue (blockCount == 2);
|
|
|
+ blockCount = blocks.get(0).getLocations().length;
|
|
|
+ try {
|
|
|
+ LOG.info("Looping until expected blockCount of 2 is received");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ignore) {
|
|
|
+ }
|
|
|
+ } while (blockCount != 2);
|
|
|
assertTrue(blocks.get(0).isCorrupt() == false);
|
|
|
|
|
|
// Corrupt all replicas. Now, block should be marked as corrupt
|
|
@@ -215,12 +225,18 @@ public class TestDatanodeBlockScanner extends TestCase {
|
|
|
// Ignore exception
|
|
|
}
|
|
|
|
|
|
- // We now have he blocks to be marked as corrup and we get back all
|
|
|
+ // We now have the blocks to be marked as corrupt and we get back all
|
|
|
// its replicas
|
|
|
- blocks = dfsClient.namenode.
|
|
|
+ do {
|
|
|
+ blocks = dfsClient.namenode.
|
|
|
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
|
|
|
- blockCount = blocks.get(0).getLocations().length;
|
|
|
- assertTrue (blockCount == 3);
|
|
|
+ blockCount = blocks.get(0).getLocations().length;
|
|
|
+ try {
|
|
|
+ LOG.info("Looping until expected blockCount of 3 is received");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ignore) {
|
|
|
+ }
|
|
|
+ } while (blockCount != 3);
|
|
|
assertTrue(blocks.get(0).isCorrupt() == true);
|
|
|
|
|
|
cluster.shutdown();
|