|
@@ -35,6 +35,7 @@ import java.io.BufferedReader;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintStream;
|
|
@@ -46,6 +47,8 @@ import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -54,12 +57,15 @@ import java.util.Set;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.LocatedFileStatus;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.RemoteIterator;
|
|
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
@@ -101,6 +107,7 @@ import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.log4j.PatternLayout;
|
|
|
import org.apache.log4j.RollingFileAppender;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import com.google.common.collect.Sets;
|
|
@@ -147,7 +154,7 @@ public class TestFsck {
|
|
|
assertEquals(expectedErrCode, errCode);
|
|
|
}
|
|
|
GenericTestUtils.setLogLevel(FSPermissionChecker.LOG, Level.INFO);
|
|
|
- FSImage.LOG.error("OUTPUT = " + bStream.toString());
|
|
|
+ FSImage.LOG.info("OUTPUT = " + bStream.toString());
|
|
|
return bStream.toString();
|
|
|
}
|
|
|
|
|
@@ -475,7 +482,25 @@ public class TestFsck {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public void corruptBlocks(MiniDFSCluster cluster) throws IOException {
|
|
|
+ for (int corruptIdx : blocksToCorrupt) {
|
|
|
+ // Corrupt a block by deleting it
|
|
|
+ ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(name,
|
|
|
+ blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
|
|
|
+ for (int i = 0; i < numDataNodes; i++) {
|
|
|
+ File blockFile = cluster.getBlockFile(i, block);
|
|
|
+ if(blockFile != null && blockFile.exists()) {
|
|
|
+ FileOutputStream blockFileStream =
|
|
|
+ new FileOutputStream(blockFile, false);
|
|
|
+ blockFileStream.write("corrupt".getBytes());
|
|
|
+ blockFileStream.close();
|
|
|
+ FSImage.LOG.info("Corrupted block file " + blockFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void checkSalvagedRemains() throws IOException {
|
|
|
int chainIdx = 0;
|
|
|
HdfsFileStatus status = dfsClient.getFileInfo(name);
|
|
@@ -1890,4 +1915,97 @@ public class TestFsck {
|
|
|
if (cluster != null) {cluster.shutdown();}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test (timeout = 300000)
|
|
|
+ public void testFsckMoveAfterCorruption() throws Exception {
|
|
|
+ final int DFS_BLOCK_SIZE = 512 * 1024;
|
|
|
+ final int NUM_DATANODES = 1;
|
|
|
+ final int REPLICATION = 1;
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ try {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE);
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
+ DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
+ final String srcDir = "/srcdat";
|
|
|
+ final DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck")
|
|
|
+ .setMinSize(DFS_BLOCK_SIZE * 2).setMaxSize(DFS_BLOCK_SIZE * 3)
|
|
|
+ .setNumFiles(1).build();
|
|
|
+ util.createFiles(dfs, srcDir, (short) REPLICATION);
|
|
|
+ final String fileNames[] = util.getFileNames(srcDir);
|
|
|
+ FSImage.LOG.info("Created files: " + Arrays.toString(fileNames));
|
|
|
+
|
|
|
+ // Run fsck here. The output is automatically logged for easier debugging
|
|
|
+ String outStr = runFsck(conf, 0, true, "/", "-files", "-blocks");
|
|
|
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
|
|
+
|
|
|
+ // Corrupt the first block
|
|
|
+ final DFSClient dfsClient = new DFSClient(
|
|
|
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
|
|
|
+ final String blockFileToCorrupt = fileNames[0];
|
|
|
+ final CorruptedTestFile ctf = new CorruptedTestFile(blockFileToCorrupt,
|
|
|
+ Sets.newHashSet(0), dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE);
|
|
|
+ ctf.corruptBlocks(cluster);
|
|
|
+
|
|
|
+ // Wait for fsck to discover all the missing blocks
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ try {
|
|
|
+ final String str = runFsck(conf, 1, false, "/");
|
|
|
+ String numCorrupt = null;
|
|
|
+ for (String line : str.split(LINE_SEPARATOR)) {
|
|
|
+ Matcher m = numCorruptBlocksPattern.matcher(line);
|
|
|
+ if (m.matches()) {
|
|
|
+ numCorrupt = m.group(1);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (numCorrupt == null) {
|
|
|
+ Assert.fail("Cannot find corrupt blocks count in fsck output.");
|
|
|
+ }
|
|
|
+ if (Integer.parseInt(numCorrupt) == ctf.getTotalMissingBlocks()) {
|
|
|
+ assertTrue(str.contains(NamenodeFsck.CORRUPT_STATUS));
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ FSImage.LOG.error("Exception caught", e);
|
|
|
+ Assert.fail("Caught unexpected exception.");
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 1000, 60000);
|
|
|
+
|
|
|
+ runFsck(conf, 1, true, "/", "-files", "-blocks", "-racks");
|
|
|
+ FSImage.LOG.info("Moving blocks to lost+found");
|
|
|
+ // Fsck will return error since we corrupted a block
|
|
|
+ runFsck(conf, 1, false, "/", "-move");
|
|
|
+
|
|
|
+ final List<LocatedFileStatus> retVal = new ArrayList<>();
|
|
|
+ final RemoteIterator<LocatedFileStatus> iter =
|
|
|
+ dfs.listFiles(new Path("/lost+found"), true);
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ retVal.add(iter.next());
|
|
|
+ }
|
|
|
+ FSImage.LOG.info("Items in lost+found: " + retVal);
|
|
|
+
|
|
|
+ // Expect all good blocks moved, only corrupted block skipped.
|
|
|
+ long totalLength = 0;
|
|
|
+ for (LocatedFileStatus lfs: retVal) {
|
|
|
+ totalLength += lfs.getLen();
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Nothing is moved to lost+found!", totalLength > 0);
|
|
|
+ util.cleanup(dfs, srcDir);
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|