|
@@ -26,13 +26,22 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
|
|
|
import com.google.common.base.Supplier;
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
|
+import org.apache.commons.lang.text.StrBuilder;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.ReconfigurationUtil;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSClient;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
|
@@ -452,4 +461,135 @@ public class TestDFSAdmin {
|
|
|
assertThat(outs.get(offset + 2),
|
|
|
is(allOf(containsString("To:"), containsString("6"))));
|
|
|
}
|
|
|
+
|
|
|
+ private static String scanIntoString(final ByteArrayOutputStream baos) {
|
|
|
+ final StrBuilder sb = new StrBuilder();
|
|
|
+ final Scanner scanner = new Scanner(baos.toString());
|
|
|
+ while (scanner.hasNextLine()) {
|
|
|
+ sb.appendln(scanner.nextLine());
|
|
|
+ }
|
|
|
+ scanner.close();
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testReportCommand() throws Exception {
|
|
|
+ redirectStream();
|
|
|
+
|
|
|
+ /* init conf */
|
|
|
+ final Configuration dfsConf = new HdfsConfiguration();
|
|
|
+ dfsConf.setInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
|
+ 500); // 0.5s
|
|
|
+ dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
+ final Path baseDir = new Path(
|
|
|
+ PathUtils.getTestDir(getClass()).getAbsolutePath(),
|
|
|
+ GenericTestUtils.getMethodName());
|
|
|
+ dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
|
|
|
+
|
|
|
+ final int numDn = 3;
|
|
|
+
|
|
|
+ /* init cluster */
|
|
|
+ try(MiniDFSCluster miniCluster = new MiniDFSCluster
|
|
|
+ .Builder(dfsConf)
|
|
|
+ .numDataNodes(numDn).build()) {
|
|
|
+
|
|
|
+ miniCluster.waitActive();
|
|
|
+ assertEquals(numDn, miniCluster.getDataNodes().size());
|
|
|
+
|
|
|
+ /* local vars */
|
|
|
+ final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
|
|
|
+ final DFSClient client = miniCluster.getFileSystem().getClient();
|
|
|
+
|
|
|
+ /* run and verify report command */
|
|
|
+ resetStream();
|
|
|
+ assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
|
|
|
+ verifyNodesAndCorruptBlocks(numDn, numDn, 0, client);
|
|
|
+
|
|
|
+ /* shut down one DN */
|
|
|
+ final List<DataNode> datanodes = miniCluster.getDataNodes();
|
|
|
+ final DataNode last = datanodes.get(datanodes.size() - 1);
|
|
|
+ last.shutdown();
|
|
|
+ miniCluster.setDataNodeDead(last.getDatanodeId());
|
|
|
+
|
|
|
+ /* run and verify report command */
|
|
|
+ assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
|
|
|
+ verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, client);
|
|
|
+
|
|
|
+ /* corrupt one block */
|
|
|
+ final short replFactor = 1;
|
|
|
+ final long fileLength = 512L;
|
|
|
+ final FileSystem fs = miniCluster.getFileSystem();
|
|
|
+ final Path file = new Path(baseDir, "/corrupted");
|
|
|
+ DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
|
|
|
+ DFSTestUtil.waitReplication(fs, file, replFactor);
|
|
|
+
|
|
|
+ final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
|
|
|
+ final int blockFilesCorrupted = miniCluster
|
|
|
+ .corruptBlockOnDataNodes(block);
|
|
|
+ assertEquals("Fail to corrupt all replicas for block " + block,
|
|
|
+ replFactor, blockFilesCorrupted);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Increase replication factor, this should invoke transfer request.
|
|
|
+ * Receiving datanode fails on checksum and reports it to namenode
|
|
|
+ */
|
|
|
+ fs.setReplication(file, (short) (replFactor + 1));
|
|
|
+
|
|
|
+ /* get block details and check if the block is corrupt */
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ LocatedBlocks blocks = null;
|
|
|
+ try {
|
|
|
+ blocks = client.getNamenode().getBlockLocations(file.toString(), 0,
|
|
|
+ Long.MAX_VALUE);
|
|
|
+ } catch (IOException e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return blocks != null && blocks.get(0).isCorrupt();
|
|
|
+ }
|
|
|
+ }, 100, 60000);
|
|
|
+
|
|
|
+ BlockManagerTestUtil.updateState(
|
|
|
+ miniCluster.getNameNode().getNamesystem().getBlockManager());
|
|
|
+
|
|
|
+ /* run and verify report command */
|
|
|
+ resetStream();
|
|
|
+ assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
|
|
|
+ verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, client);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyNodesAndCorruptBlocks(
|
|
|
+ final int numDn,
|
|
|
+ final int numLiveDn,
|
|
|
+ final int numCorruptBlocks,
|
|
|
+ final DFSClient client) throws IOException {
|
|
|
+
|
|
|
+ /* init vars */
|
|
|
+ final String outStr = scanIntoString(out);
|
|
|
+ final String expectedLiveNodesStr = String.format(
|
|
|
+ "Live datanodes (%d)",
|
|
|
+ numLiveDn);
|
|
|
+ final String expectedCorruptedBlocksStr = String.format(
|
|
|
+ "Blocks with corrupt replicas: %d",
|
|
|
+ numCorruptBlocks);
|
|
|
+
|
|
|
+ /* verify nodes and corrupt blocks */
|
|
|
+ assertThat(outStr, is(allOf(
|
|
|
+ containsString(expectedLiveNodesStr),
|
|
|
+ containsString(expectedCorruptedBlocksStr))));
|
|
|
+
|
|
|
+ assertEquals(
|
|
|
+ numDn,
|
|
|
+ client.getDatanodeStorageReport(DatanodeReportType.ALL).length);
|
|
|
+ assertEquals(
|
|
|
+ numLiveDn,
|
|
|
+ client.getDatanodeStorageReport(DatanodeReportType.LIVE).length);
|
|
|
+ assertEquals(
|
|
|
+ numDn - numLiveDn,
|
|
|
+ client.getDatanodeStorageReport(DatanodeReportType.DEAD).length);
|
|
|
+ assertEquals(numCorruptBlocks, client.getCorruptBlocksCount());
|
|
|
+ }
|
|
|
}
|