|
@@ -18,13 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertNotNull;
|
|
|
-import static org.junit.Assert.assertNull;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
@@ -48,6 +41,7 @@ import java.util.Set;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -66,11 +60,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
@@ -86,8 +83,17 @@ import org.apache.log4j.PatternLayout;
|
|
|
import org.apache.log4j.RollingFileAppender;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-import com.google.common.collect.Sets;
|
|
|
-import static org.mockito.Mockito.*;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.anyBoolean;
|
|
|
+import static org.mockito.Matchers.anyLong;
|
|
|
+import static org.mockito.Matchers.anyString;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
|
* A JUnit test for doing fsck
|
|
@@ -118,7 +124,7 @@ public class TestFsck {
|
|
|
System.getProperty("line.separator");
|
|
|
|
|
|
static String runFsck(Configuration conf, int expectedErrCode,
|
|
|
- boolean checkErrorCode,String... path)
|
|
|
+ boolean checkErrorCode,String... path)
|
|
|
throws Exception {
|
|
|
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
|
|
|
PrintStream out = new PrintStream(bStream, true);
|
|
@@ -1096,4 +1102,227 @@ public class TestFsck {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for blockIdCK
|
|
|
+ */
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testBlockIdCK() throws Exception {
|
|
|
+
|
|
|
+ final short REPL_FACTOR = 2;
|
|
|
+ short NUM_DN = 2;
|
|
|
+ final long blockSize = 512;
|
|
|
+
|
|
|
+ String [] racks = {"/rack1", "/rack2"};
|
|
|
+ String [] hosts = {"host1", "host2"};
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ DistributedFileSystem dfs = null;
|
|
|
+ cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
|
|
|
+ .racks(racks).build();
|
|
|
+
|
|
|
+ assertNotNull("Failed Cluster Creation", cluster);
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ assertNotNull("Failed to get FileSystem", dfs);
|
|
|
+
|
|
|
+ DFSTestUtil util = new DFSTestUtil.Builder().
|
|
|
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
|
|
|
+ //create files
|
|
|
+ final String pathString = new String("/testfile");
|
|
|
+ final Path path = new Path(pathString);
|
|
|
+ util.createFile(dfs, path, 1024, REPL_FACTOR , 1000L);
|
|
|
+ util.waitReplication(dfs, path, REPL_FACTOR);
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
|
|
|
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
|
|
|
+ }
|
|
|
+ String[] bIds = sb.toString().split(" ");
|
|
|
+
|
|
|
+ //run fsck
|
|
|
+ try {
|
|
|
+ //illegal input test
|
|
|
+ String runFsckResult = runFsck(conf, 0, true, "/", "-blockId",
|
|
|
+ "not_a_block_id");
|
|
|
+ assertTrue(runFsckResult.contains("Incorrect blockId format:"));
|
|
|
+
|
|
|
+ //general test
|
|
|
+ runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString());
|
|
|
+ assertTrue(runFsckResult.contains(bIds[0]));
|
|
|
+ assertTrue(runFsckResult.contains(bIds[1]));
|
|
|
+ assertTrue(runFsckResult.contains(
|
|
|
+ "Block replica on datanode/rack: host1/rack1 is HEALTHY"));
|
|
|
+ assertTrue(runFsckResult.contains(
|
|
|
+ "Block replica on datanode/rack: host2/rack2 is HEALTHY"));
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for blockIdCK with datanode decommission
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testBlockIdCKDecommission() throws Exception {
|
|
|
+
|
|
|
+ final short REPL_FACTOR = 1;
|
|
|
+ short NUM_DN = 2;
|
|
|
+ final long blockSize = 512;
|
|
|
+ boolean checkDecommissionInProgress = false;
|
|
|
+ String [] racks = {"/rack1", "/rack2"};
|
|
|
+ String [] hosts = {"host1", "host2"};
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster;
|
|
|
+ DistributedFileSystem dfs ;
|
|
|
+ cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
|
|
|
+ .racks(racks).build();
|
|
|
+
|
|
|
+ assertNotNull("Failed Cluster Creation", cluster);
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ assertNotNull("Failed to get FileSystem", dfs);
|
|
|
+
|
|
|
+ DFSTestUtil util = new DFSTestUtil.Builder().
|
|
|
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
|
|
|
+ //create files
|
|
|
+ final String pathString = new String("/testfile");
|
|
|
+ final Path path = new Path(pathString);
|
|
|
+ util.createFile(dfs, path, 1024, REPL_FACTOR, 1000L);
|
|
|
+ util.waitReplication(dfs, path, REPL_FACTOR);
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
|
|
|
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
|
|
|
+ }
|
|
|
+ String[] bIds = sb.toString().split(" ");
|
|
|
+ try {
|
|
|
+ //make sure datanode that has replica is fine before decommission
|
|
|
+ String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
|
|
|
+ System.out.println(outStr);
|
|
|
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
|
|
+
|
|
|
+ //decommission datanode
|
|
|
+ ExtendedBlock eb = util.getFirstBlock(dfs, path);
|
|
|
+ DatanodeDescriptor dn = cluster.getNameNode().getNamesystem()
|
|
|
+ .getBlockManager().getBlockCollection(eb.getLocalBlock())
|
|
|
+ .getBlocks()[0].getDatanode(0);
|
|
|
+ cluster.getNameNode().getNamesystem().getBlockManager()
|
|
|
+ .getDatanodeManager().startDecommission(dn);
|
|
|
+ String dnName = dn.getXferAddr();
|
|
|
+
|
|
|
+ //wait for decommission start
|
|
|
+ DatanodeInfo datanodeInfo = null;
|
|
|
+ int count = 0;
|
|
|
+ do {
|
|
|
+ Thread.sleep(2000);
|
|
|
+ for (DatanodeInfo info : dfs.getDataNodeStats()) {
|
|
|
+ if (dnName.equals(info.getXferAddr())) {
|
|
|
+ datanodeInfo = info;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //check decommissioning only once
|
|
|
+ if(!checkDecommissionInProgress && datanodeInfo != null
|
|
|
+ && datanodeInfo.isDecommissionInProgress()) {
|
|
|
+ String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]);
|
|
|
+ assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS));
|
|
|
+ checkDecommissionInProgress = true;
|
|
|
+ }
|
|
|
+ } while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
|
|
|
+
|
|
|
+ //check decommissioned
|
|
|
+ String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]);
|
|
|
+ assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS));
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for blockIdCK with block corruption
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testBlockIdCKCorruption() throws Exception {
|
|
|
+ short NUM_DN = 1;
|
|
|
+ final long blockSize = 512;
|
|
|
+ Random random = new Random();
|
|
|
+ DFSClient dfsClient;
|
|
|
+ LocatedBlocks blocks;
|
|
|
+ ExtendedBlock block;
|
|
|
+ short repFactor = 1;
|
|
|
+ String [] racks = {"/rack1"};
|
|
|
+ String [] hosts = {"host1"};
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
|
|
|
+ // Set short retry timeouts so this test runs faster
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ DistributedFileSystem dfs = null;
|
|
|
+ try {
|
|
|
+ cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
|
|
|
+ .racks(racks).build();
|
|
|
+
|
|
|
+ assertNotNull("Failed Cluster Creation", cluster);
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ assertNotNull("Failed to get FileSystem", dfs);
|
|
|
+
|
|
|
+ DFSTestUtil util = new DFSTestUtil.Builder().
|
|
|
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
|
|
|
+ //create files
|
|
|
+ final String pathString = new String("/testfile");
|
|
|
+ final Path path = new Path(pathString);
|
|
|
+ util.createFile(dfs, path, 1024, repFactor, 1000L);
|
|
|
+ util.waitReplication(dfs, path, repFactor);
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
|
|
|
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
|
|
|
+ }
|
|
|
+ String[] bIds = sb.toString().split(" ");
|
|
|
+
|
|
|
+ //make sure block is healthy before we corrupt it
|
|
|
+ String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
|
|
|
+ System.out.println(outStr);
|
|
|
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
|
|
+
|
|
|
+ // corrupt replicas
|
|
|
+ block = DFSTestUtil.getFirstBlock(dfs, path);
|
|
|
+ File blockFile = MiniDFSCluster.getBlockFile(0, block);
|
|
|
+ if (blockFile != null && blockFile.exists()) {
|
|
|
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
|
|
|
+ FileChannel channel = raFile.getChannel();
|
|
|
+ String badString = "BADBAD";
|
|
|
+ int rand = random.nextInt((int) channel.size()/2);
|
|
|
+ raFile.seek(rand);
|
|
|
+ raFile.write(badString.getBytes());
|
|
|
+ raFile.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
|
|
|
+
|
|
|
+ outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
|
|
|
+ System.out.println(outStr);
|
|
|
+ assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|