|
@@ -19,6 +19,7 @@ package org.apache.hadoop.dfs;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
import junit.framework.TestCase;
|
|
import java.io.*;
|
|
import java.io.*;
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.net.*;
|
|
import java.net.*;
|
|
|
|
|
|
@@ -167,4 +168,152 @@ public class TestReplication extends TestCase {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Waits for all of the blocks to have expected replication
|
|
|
|
+ private void waitForBlockReplication(String filename,
|
|
|
|
+ ClientProtocol namenode,
|
|
|
|
+ int expected, long maxWaitSec)
|
|
|
|
+ throws IOException {
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+
|
|
|
|
+ //wait for all the blocks to be replicated;
|
|
|
|
+ System.out.println("Checking for block replication for " + filename);
|
|
|
|
+ int iters = 0;
|
|
|
|
+ while (true) {
|
|
|
|
+ boolean replOk = true;
|
|
|
|
+ LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
|
|
|
|
+ Long.MAX_VALUE);
|
|
|
|
+
|
|
|
|
+ for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
|
|
|
|
+ iter.hasNext();) {
|
|
|
|
+ LocatedBlock block = iter.next();
|
|
|
|
+ int actual = block.getLocations().length;
|
|
|
|
+ if ( actual < expected ) {
|
|
|
|
+ if (true || iters > 0) {
|
|
|
|
+ System.out.println("Not enough replicas for " + block.getBlock() +
|
|
|
|
+ " yet. Expecting " + expected + ", got " +
|
|
|
|
+ actual + ".");
|
|
|
|
+ }
|
|
|
|
+ replOk = false;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (replOk) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ iters++;
|
|
|
|
+
|
|
|
|
+ if (maxWaitSec > 0 &&
|
|
|
|
+ (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
|
|
|
|
+ throw new IOException("Timedout while waiting for all blocks to " +
|
|
|
|
+ " be replicated for " + filename);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* This test makes sure that NameNode retries all the available blocks
|
|
|
|
+ * for under replicated blocks.
|
|
|
|
+ *
|
|
|
|
+ * It creates a file with one block and replication of 4. It corrupts
|
|
|
|
+ * two of the blocks and removes one of the replicas. Expected behaviour is
|
|
|
|
+ * that missing replica will be copied from one valid source.
|
|
|
|
+ */
|
|
|
|
+ public void testPendingReplicationRetry() throws IOException {
|
|
|
|
+
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
|
+ int numDataNodes = 4;
|
|
|
|
+ String testFile = "/replication-test-file";
|
|
|
|
+ Path testPath = new Path(testFile);
|
|
|
|
+
|
|
|
|
+ byte buffer[] = new byte[1024];
|
|
|
|
+ for (int i=0; i<buffer.length; i++) {
|
|
|
|
+ buffer[i] = '1';
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+ conf.set("dfs.replication", Integer.toString(numDataNodes));
|
|
|
|
+ //first time format
|
|
|
|
+ cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
|
|
|
|
+ true, null, null);
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
|
|
|
+ cluster.getNameNodePort()),
|
|
|
|
+ conf);
|
|
|
|
+
|
|
|
|
+ OutputStream out = cluster.getFileSystem().create(testPath);
|
|
|
|
+ out.write(buffer);
|
|
|
|
+ out.close();
|
|
|
|
+
|
|
|
|
+ waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
|
|
|
|
+
|
|
|
|
+ // get first block of the file.
|
|
|
|
+ String block = dfsClient.namenode.
|
|
|
|
+ getBlockLocations(testFile, 0, Long.MAX_VALUE).
|
|
|
|
+ get(0).getBlock().toString();
|
|
|
|
+
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ cluster = null;
|
|
|
|
+
|
|
|
|
+ //Now mess up some of the replicas.
|
|
|
|
+ //Delete the first and corrupt the next two.
|
|
|
|
+ File baseDir = new File(System.getProperty("test.build.data"),
|
|
|
|
+ "dfs/data");
|
|
|
|
+ for (int i=0; i<25; i++) {
|
|
|
|
+ buffer[i] = '0';
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int fileCount = 0;
|
|
|
|
+ for (int i=0; i<6; i++) {
|
|
|
|
+ File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
|
|
|
|
+ System.out.println("Checking for file " + blockFile);
|
|
|
|
+
|
|
|
|
+ if (blockFile.exists()) {
|
|
|
|
+ if (fileCount == 0) {
|
|
|
|
+ assertTrue(blockFile.delete());
|
|
|
|
+ } else {
|
|
|
|
+ // corrupt it.
|
|
|
|
+ long len = blockFile.length();
|
|
|
|
+ assertTrue(len > 50);
|
|
|
|
+ RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
|
|
|
|
+ blockOut.seek(len/3);
|
|
|
|
+ blockOut.write(buffer, 0, 25);
|
|
|
|
+ }
|
|
|
|
+ fileCount++;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals(3, fileCount);
|
|
|
|
+
|
|
|
|
+ /* Start the MiniDFSCluster with more datanodes since once a writeBlock
|
|
|
|
+ * to a datanode node fails, same block can not be written to it
|
|
|
|
+ * immediately. In our case some replication attempts will fail.
|
|
|
|
+ */
|
|
|
|
+ conf = new Configuration();
|
|
|
|
+ conf.set("dfs.replication", Integer.toString(numDataNodes));
|
|
|
|
+ conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
|
|
|
|
+ conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
|
|
|
|
+ conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
|
|
|
|
+
|
|
|
|
+ cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
|
|
|
|
+ true, null, null);
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+
|
|
|
|
+ dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
|
|
|
+ cluster.getNameNodePort()),
|
|
|
|
+ conf);
|
|
|
|
+
|
|
|
|
+ waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|