|
@@ -35,16 +35,16 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
@@ -55,7 +55,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.log4j.Level;
|
|
|
-import org.mockito.Matchers;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
@@ -227,11 +226,9 @@ public class TestFileAppend4 extends TestCase {
|
|
|
|
|
|
int actual = b.getNames().length;
|
|
|
if ( actual < expected ) {
|
|
|
- if (true || iters > 0) {
|
|
|
- LOG.info("Not enough replicas for " + b +
|
|
|
- " yet. Expecting " + expected + ", got " +
|
|
|
- actual + ".");
|
|
|
- }
|
|
|
+ LOG.info("Not enough replicas for " + b +
|
|
|
+ " yet. Expecting " + expected + ", got " +
|
|
|
+ actual + ".");
|
|
|
replOk = false;
|
|
|
break;
|
|
|
}
|
|
@@ -260,8 +257,20 @@ public class TestFileAppend4 extends TestCase {
|
|
|
AppendTestUtil.check(whichfs, file1, fileSize);
|
|
|
}
|
|
|
|
|
|
- private void corruptDatanode(int dnNumber) throws Exception {
|
|
|
- // get the FS data of the 2nd datanode
|
|
|
+ enum CorruptionType {
|
|
|
+ CORRUPT_LAST_CHUNK,
|
|
|
+ TRUNCATE_BLOCK_TO_ZERO,
|
|
|
+ TRUNCATE_BLOCK_HALF;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Corrupt all of the blocks in the blocksBeingWritten dir
|
|
|
+ * for the specified datanode number. The corruption is
|
|
|
+ * specifically the last checksum chunk of the file being
|
|
|
+ * modified by writing random data into it.
|
|
|
+ */
|
|
|
+ private void corruptDataNode(int dnNumber, CorruptionType type) throws Exception {
|
|
|
+ // get the FS data of the specified datanode
|
|
|
File data_dir = new File(System.getProperty("test.build.data"),
|
|
|
"dfs/data/data" +
|
|
|
Integer.toString(dnNumber*2 + 1) +
|
|
@@ -271,21 +280,38 @@ public class TestFileAppend4 extends TestCase {
|
|
|
// only touch the actual data, not the metadata (with CRC)
|
|
|
if (block.getName().startsWith("blk_") &&
|
|
|
!block.getName().endsWith("meta")) {
|
|
|
- RandomAccessFile file = new RandomAccessFile(block, "rw");
|
|
|
- FileChannel channel = file.getChannel();
|
|
|
-
|
|
|
- Random r = new Random();
|
|
|
- long lastBlockSize = channel.size() % 512;
|
|
|
- long position = channel.size() - lastBlockSize;
|
|
|
- int length = r.nextInt((int)(channel.size() - position + 1));
|
|
|
- byte[] buffer = new byte[length];
|
|
|
- r.nextBytes(buffer);
|
|
|
-
|
|
|
- channel.write(ByteBuffer.wrap(buffer), position);
|
|
|
- System.out.println("Deliberately corrupting file " + block.getName() +
|
|
|
- " at offset " + position +
|
|
|
- " length " + length);
|
|
|
- file.close();
|
|
|
+ if (type == CorruptionType.CORRUPT_LAST_CHUNK) {
|
|
|
+ RandomAccessFile file = new RandomAccessFile(block, "rw");
|
|
|
+ FileChannel channel = file.getChannel();
|
|
|
+ Random r = new Random();
|
|
|
+ long lastBlockSize = channel.size() % 512;
|
|
|
+ long position = channel.size() - lastBlockSize;
|
|
|
+ int length = r.nextInt((int)(channel.size() - position + 1));
|
|
|
+ byte[] buffer = new byte[length];
|
|
|
+ r.nextBytes(buffer);
|
|
|
+
|
|
|
+
|
|
|
+ channel.write(ByteBuffer.wrap(buffer), position);
|
|
|
+ System.out.println("Deliberately corrupting file " + block.getName() +
|
|
|
+ " at offset " + position +
|
|
|
+ " length " + length);
|
|
|
+ file.close();
|
|
|
+
|
|
|
+ } else if (type == CorruptionType.TRUNCATE_BLOCK_TO_ZERO) {
|
|
|
+ LOG.info("Truncating block file at " + block);
|
|
|
+ RandomAccessFile blockFile = new RandomAccessFile(block, "rw");
|
|
|
+ blockFile.setLength(0);
|
|
|
+ blockFile.close();
|
|
|
+
|
|
|
+ RandomAccessFile metaFile = new RandomAccessFile(
|
|
|
+ FSDataset.findMetaFile(block), "rw");
|
|
|
+ metaFile.setLength(0);
|
|
|
+ metaFile.close();
|
|
|
+ } else if (type == CorruptionType.TRUNCATE_BLOCK_HALF) {
|
|
|
+ FSDatasetTestUtil.truncateBlockFile(block, block.length() / 2);
|
|
|
+ } else {
|
|
|
+ assert false;
|
|
|
+ }
|
|
|
++corrupted;
|
|
|
}
|
|
|
}
|
|
@@ -554,7 +580,7 @@ public class TestFileAppend4 extends TestCase {
|
|
|
LOG.info("STOPPED first instance of the cluster");
|
|
|
|
|
|
// give the second datanode a bad CRC
|
|
|
- corruptDatanode(corruptDN);
|
|
|
+ corruptDataNode(corruptDN, CorruptionType.CORRUPT_LAST_CHUNK);
|
|
|
|
|
|
// restart the cluster
|
|
|
cluster = new MiniDFSCluster(conf, 3, false, null);
|
|
@@ -1051,6 +1077,115 @@ public class TestFileAppend4 extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test for what happens when the machine doing the write totally
|
|
|
+ * loses power, and thus when it restarts, the local replica has been
|
|
|
+ * truncated to 0 bytes (very common with journaling filesystems)
|
|
|
+ */
|
|
|
+ public void testTruncatedPrimaryDN() throws Exception {
|
|
|
+ LOG.info("START");
|
|
|
+ runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_TO_ZERO);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for what happens when the machine doing the write loses power
|
|
|
+ * but a previous length of the block being written had made it to the
|
|
|
+ * journal
|
|
|
+ */
|
|
|
+ public void testHalfLengthPrimaryDN() throws Exception {
|
|
|
+ LOG.info("START");
|
|
|
+ runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_HALF);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void runDNRestartCorruptType(CorruptionType corrupt) throws Exception {
|
|
|
+ cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
+ FileSystem fs1 = cluster.getFileSystem();
|
|
|
+ try {
|
|
|
+ short rep = 3; // replication
|
|
|
+ assertTrue(BLOCK_SIZE%4 == 0);
|
|
|
+
|
|
|
+ file1 = new Path("/dnDeath.dat");
|
|
|
+
|
|
|
+ // write 1/2 block & close
|
|
|
+ stm = fs1.create(file1, true, 1024, rep, 4096);
|
|
|
+ AppendTestUtil.write(stm, 0, 1024);
|
|
|
+ stm.sync();
|
|
|
+ loseLeases(fs1);
|
|
|
+
|
|
|
+ DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
|
|
|
+ dfso.abortForTests();
|
|
|
+
|
|
|
+ // close the primary DN
|
|
|
+ DataNodeProperties badDN = cluster.stopDataNode(0);
|
|
|
+
|
|
|
+ // Truncate the block on the primary DN
|
|
|
+ corruptDataNode(0, corrupt);
|
|
|
+
|
|
|
+ // Start the DN back up
|
|
|
+ cluster.restartDataNode(badDN);
|
|
|
+
|
|
|
+ // Recover the lease
|
|
|
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
|
|
|
+ recoverFile(fs2);
|
|
|
+
|
|
|
+ assertFileSize(fs2, 1024);
|
|
|
+ checkFile(fs2, 1024);
|
|
|
+ } finally {
|
|
|
+ // explicitly do not shut down fs1, since it's been frozen up by
|
|
|
+ // killing the DataStreamer and not allowing recovery
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testFullClusterPowerLoss() throws Exception {
|
|
|
+ cluster = new MiniDFSCluster(conf, 2, true, null);
|
|
|
+ FileSystem fs1 = cluster.getFileSystem();
|
|
|
+ try {
|
|
|
+ short rep = 2; // replication
|
|
|
+ assertTrue(BLOCK_SIZE%4 == 0);
|
|
|
+
|
|
|
+ file1 = new Path("/dnDeath.dat");
|
|
|
+
|
|
|
+ // write 1/2 block & close
|
|
|
+ stm = fs1.create(file1, true, 1024, rep, 4096);
|
|
|
+ AppendTestUtil.write(stm, 0, 1024);
|
|
|
+ stm.sync();
|
|
|
+ loseLeases(fs1);
|
|
|
+
|
|
|
+ DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
|
|
|
+ dfso.abortForTests();
|
|
|
+
|
|
|
+ // close the DNs
|
|
|
+ DataNodeProperties badDN = cluster.stopDataNode(0);
|
|
|
+ DataNodeProperties badDN2 = cluster.stopDataNode(0); // what was 1 is now 0
|
|
|
+ assertNotNull(badDN);
|
|
|
+ assertNotNull(badDN2);
|
|
|
+
|
|
|
+ // Truncate one of them as if its journal got corrupted
|
|
|
+ corruptDataNode(0, CorruptionType.TRUNCATE_BLOCK_HALF);
|
|
|
+
|
|
|
+ // Start the DN back up
|
|
|
+ cluster.restartDataNode(badDN);
|
|
|
+ cluster.restartDataNode(badDN2);
|
|
|
+
|
|
|
+ // Wait for a heartbeat to make sure we get the initial block
|
|
|
+ // report of the replicasBeingWritten
|
|
|
+ cluster.waitForDNHeartbeat(0, 10000);
|
|
|
+ cluster.waitForDNHeartbeat(1, 10000);
|
|
|
+
|
|
|
+ // Recover the lease
|
|
|
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
|
|
|
+ recoverFile(fs2);
|
|
|
+
|
|
|
+ assertFileSize(fs2, 512);
|
|
|
+ checkFile(fs2, 512);
|
|
|
+ } finally {
|
|
|
+ // explicitly do not shut down fs1, since it's been frozen up by
|
|
|
+ // killing the DataStreamer and not allowing recovery
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Mockito answer helper that triggers one latch as soon as the
|
|
|
* method is called, then waits on another before continuing.
|