|
@@ -1153,6 +1153,22 @@ public class TestFileAppend4 extends TestCase {
|
|
|
private final CountDownLatch fireLatch = new CountDownLatch(1);
|
|
|
private final CountDownLatch waitLatch = new CountDownLatch(1);
|
|
|
|
|
|
+ boolean delayBefore = true;
|
|
|
+
|
|
|
+ int numTimes = 1;
|
|
|
+
|
|
|
+ public DelayAnswer() {
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param delayBefore
|
|
|
+ * if true, the delay is before the method is called. if false, the
|
|
|
+ * delay is after the method returns.
|
|
|
+ */
|
|
|
+ public DelayAnswer(boolean delayBefore) {
|
|
|
+ this.delayBefore = delayBefore;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Wait until the method is called.
|
|
|
*/
|
|
@@ -1169,6 +1185,19 @@ public class TestFileAppend4 extends TestCase {
|
|
|
}
|
|
|
|
|
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ if (delayBefore)
|
|
|
+ doDelay();
|
|
|
+ Object ret = invocation.callRealMethod();
|
|
|
+ if (!delayBefore)
|
|
|
+ doDelay();
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doDelay() throws Throwable {
|
|
|
+ synchronized (this) {
|
|
|
+ if (--numTimes < 0)
|
|
|
+ return;
|
|
|
+ }
|
|
|
LOG.info("DelayAnswer firing fireLatch");
|
|
|
fireLatch.countDown();
|
|
|
try {
|
|
@@ -1178,7 +1207,6 @@ public class TestFileAppend4 extends TestCase {
|
|
|
} catch (InterruptedException ie) {
|
|
|
throw new IOException("Interrupted waiting on latch", ie);
|
|
|
}
|
|
|
- return invocation.callRealMethod();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1315,6 +1343,69 @@ public class TestFileAppend4 extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Test case where recovery starts on one node, but it's very slow
|
|
|
+ * (delayed right after nextGenerationStamp). A second recovery attempt
|
|
|
+ * completes while this one is being slow. Then we should reject the
|
|
|
+ * recovery from the first one, since it has a lower gen stamp.
|
|
|
+ */
|
|
|
+ public void testSimultaneousRecoveries() throws Exception {
|
|
|
+ LOG.info("START");
|
|
|
+ cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
+ FileSystem fs1 = cluster.getFileSystem();;
|
|
|
+ final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
|
|
|
+ try {
|
|
|
+ createFile(fs1, "/testSimultaneousRecoveries", 3, BBW_SIZE);
|
|
|
+ stm.sync();
|
|
|
+ loseLeases(fs1);
|
|
|
+
|
|
|
+ // Make the first nextGenerationStamp call get delayed
|
|
|
+ DelayAnswer delayer = new DelayAnswer(false);
|
|
|
+
|
|
|
+ NameNode nn = cluster.getNameNode();
|
|
|
+ nn.namesystem = spy(nn.namesystem);
|
|
|
+ NameNodeAdapter.callNextGenerationStampForBlock(
|
|
|
+ doAnswer(delayer).when(nn.namesystem),
|
|
|
+ (Block)anyObject(), anyBoolean());
|
|
|
+
|
|
|
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
|
|
|
+ Thread recoverThread = new Thread("Recovery thread") {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ recoverFile(fs2);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ err.set(t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ recoverThread.start();
|
|
|
+
|
|
|
+ LOG.info("Waiting for first nextGenerationStamp to return");
|
|
|
+ delayer.waitForCall();
|
|
|
+
|
|
|
+ LOG.info("Allowing recovery time to try again");
|
|
|
+ Thread.sleep(10000);
|
|
|
+
|
|
|
+ LOG.info("Proceeding first recovery with old GS");
|
|
|
+ delayer.proceed();
|
|
|
+
|
|
|
+ LOG.info("Joining on recovery thread");
|
|
|
+ recoverThread.join();
|
|
|
+
|
|
|
+ LOG.info("Waiting a few seconds for blocks to get corrupted");
|
|
|
+ Thread.sleep(5000);
|
|
|
+
|
|
|
+ // close() should write recovered bbw to HDFS block
|
|
|
+ assertFileSize(fs2, BBW_SIZE);
|
|
|
+ checkFile(fs2, BBW_SIZE);
|
|
|
+ } finally {
|
|
|
+ fs2.close();
|
|
|
+ fs1.close();
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ LOG.info("STOP");
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Mockito answer helper that will throw an exception a given number
|