|
@@ -18,7 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.AppendTestUtil;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
|
|
+
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.fail;
|
|
import static org.junit.Assert.fail;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
@@ -43,6 +46,7 @@ import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Random;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -94,6 +98,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
|
+import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
@@ -1035,4 +1040,106 @@ public class TestBlockRecovery {
|
|
Assert.fail("Thread failure: " + failureReason);
|
|
Assert.fail("Thread failure: " + failureReason);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Test for block recovery taking longer than the heartbeat interval.
|
|
|
|
+ */
|
|
|
|
+ @Test(timeout = 300000L)
|
|
|
|
+ public void testRecoverySlowerThanHeartbeat() throws Exception {
|
|
|
|
+ tearDown(); // Stop the Mocked DN started in startup()
|
|
|
|
+
|
|
|
|
+ SleepAnswer delayer = new SleepAnswer(3000, 6000);
|
|
|
|
+ testRecoveryWithDatanodeDelayed(delayer);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Test for block recovery timeout. All recovery attempts will be delayed
|
|
|
|
+ * and the first attempt will be lost to trigger recovery timeout and retry.
|
|
|
|
+ */
|
|
|
|
+ @Test(timeout = 300000L)
|
|
|
|
+ public void testRecoveryTimeout() throws Exception {
|
|
|
|
+ tearDown(); // Stop the Mocked DN started in startup()
|
|
|
|
+ final Random r = new Random();
|
|
|
|
+
|
|
|
|
+ // Make sure first commitBlockSynchronization call from the DN gets lost
|
|
|
|
+ // for the recovery timeout to expire and new recovery attempt
|
|
|
|
+ // to be started.
|
|
|
|
+ SleepAnswer delayer = new SleepAnswer(3000) {
|
|
|
|
+ private final AtomicBoolean callRealMethod = new AtomicBoolean();
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
+ boolean interrupted = false;
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(r.nextInt(3000) + 6000);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ interrupted = true;
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ if (callRealMethod.get()) {
|
|
|
|
+ return invocation.callRealMethod();
|
|
|
|
+ }
|
|
|
|
+ callRealMethod.set(true);
|
|
|
|
+ return null;
|
|
|
|
+ } finally {
|
|
|
|
+ if (interrupted) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ testRecoveryWithDatanodeDelayed(delayer);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void testRecoveryWithDatanodeDelayed(
|
|
|
|
+ GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
|
|
|
|
+ Configuration configuration = new HdfsConfiguration();
|
|
|
|
+ configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ cluster = new MiniDFSCluster.Builder(configuration)
|
|
|
|
+ .numDataNodes(2).build();
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ final FSNamesystem ns = cluster.getNamesystem();
|
|
|
|
+ final NameNode nn = cluster.getNameNode();
|
|
|
|
+ final DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
|
+ cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(15));
|
|
|
|
+
|
|
|
|
+ // Create a file and never close the output stream to trigger recovery
|
|
|
|
+ FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
|
|
|
|
+ (short) 2);
|
|
|
|
+ out.write(AppendTestUtil.randomBytes(0, 4096));
|
|
|
|
+ out.hsync();
|
|
|
|
+
|
|
|
|
+ List<DataNode> dataNodes = cluster.getDataNodes();
|
|
|
|
+ for (DataNode datanode : dataNodes) {
|
|
|
|
+ DatanodeProtocolClientSideTranslatorPB nnSpy =
|
|
|
|
+ InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
|
|
|
|
+
|
|
|
|
+ Mockito.doAnswer(recoveryDelayer).when(nnSpy).
|
|
|
|
+ commitBlockSynchronization(
|
|
|
|
+ Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
|
|
|
|
+ Mockito.anyLong(), Mockito.anyBoolean(),
|
|
|
|
+ Mockito.anyBoolean(), Mockito.any(DatanodeID[].class),
|
|
|
|
+ Mockito.any(String[].class));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Make sure hard lease expires to trigger replica recovery
|
|
|
|
+ cluster.setLeasePeriod(100L, 100L);
|
|
|
|
+
|
|
|
|
+ // Wait for recovery to succeed
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return ns.getCompleteBlocksTotal() > 0;
|
|
|
|
+ }
|
|
|
|
+ }, 300, 300000);
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|