|
@@ -388,7 +388,66 @@ public class TestReplication extends TestCase {
|
|
|
if (cluster != null) {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test if replication can detect mismatched length on-disk blocks
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testReplicateLenMismatchedBlock() throws Exception {
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ // test truncated block
|
|
|
+ changeBlockLen(cluster, -1);
|
|
|
+ // test extended block
|
|
|
+ changeBlockLen(cluster, 1);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
+ private void changeBlockLen(MiniDFSCluster cluster,
|
|
|
+ int lenDelta) throws IOException, InterruptedException {
|
|
|
+ final Path fileName = new Path("/file1");
|
|
|
+ final short REPLICATION_FACTOR = (short)1;
|
|
|
+ final FileSystem fs = cluster.getFileSystem();
|
|
|
+ final int fileLen = fs.getConf().getInt("io.bytes.per.checksum", 512);
|
|
|
+ DFSTestUtil.createFile(fs, fileName, fileLen, REPLICATION_FACTOR, 0);
|
|
|
+ DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
|
|
|
+
|
|
|
+ String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
|
|
|
+
|
|
|
+ // Change the length of a replica
|
|
|
+ for (int i=0; i<cluster.getDataNodes().size(); i++) {
|
|
|
+ if (TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // increase the file's replication factor
|
|
|
+ fs.setReplication(fileName, (short)(REPLICATION_FACTOR+1));
|
|
|
+
|
|
|
+ // block replication triggers corrupt block detection
|
|
|
+ DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
|
|
+ cluster.getNameNodePort()), fs.getConf());
|
|
|
+ LocatedBlocks blocks = dfsClient.namenode.getBlockLocations(
|
|
|
+ fileName.toString(), 0, fileLen);
|
|
|
+ if (lenDelta < 0) { // replica truncated
|
|
|
+ while (!blocks.get(0).isCorrupt() ||
|
|
|
+ REPLICATION_FACTOR != blocks.get(0).getLocations().length) {
|
|
|
+ Thread.sleep(100);
|
|
|
+ blocks = dfsClient.namenode.getBlockLocations(
|
|
|
+ fileName.toString(), 0, fileLen);
|
|
|
+ }
|
|
|
+ } else { // no corruption detected; block replicated
|
|
|
+ while (REPLICATION_FACTOR+1 != blocks.get(0).getLocations().length) {
|
|
|
+ Thread.sleep(100);
|
|
|
+ blocks = dfsClient.namenode.getBlockLocations(
|
|
|
+ fileName.toString(), 0, fileLen);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ fs.delete(fileName, true);
|
|
|
+ }
|
|
|
}
|