|
@@ -18,10 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
-import org.apache.hadoop.hdfs.AppendTestUtil;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
|
-
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.fail;
|
|
import static org.junit.Assert.fail;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
@@ -46,7 +43,6 @@ import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
-import java.util.Random;
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -98,7 +94,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
-import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
|
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
@@ -1040,107 +1035,4 @@ public class TestBlockRecovery {
|
|
Assert.fail("Thread failure: " + failureReason);
|
|
Assert.fail("Thread failure: " + failureReason);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Test for block recovery taking longer than the heartbeat interval.
|
|
|
|
- */
|
|
|
|
- @Test(timeout = 300000L)
|
|
|
|
- public void testRecoverySlowerThanHeartbeat() throws Exception {
|
|
|
|
- tearDown(); // Stop the Mocked DN started in startup()
|
|
|
|
-
|
|
|
|
- SleepAnswer delayer = new SleepAnswer(3000, 6000);
|
|
|
|
- testRecoveryWithDatanodeDelayed(delayer);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Test for block recovery timeout. All recovery attempts will be delayed
|
|
|
|
- * and the first attempt will be lost to trigger recovery timeout and retry.
|
|
|
|
- */
|
|
|
|
- @Test(timeout = 300000L)
|
|
|
|
- public void testRecoveryTimeout() throws Exception {
|
|
|
|
- tearDown(); // Stop the Mocked DN started in startup()
|
|
|
|
- final Random r = new Random();
|
|
|
|
-
|
|
|
|
- // Make sure first commitBlockSynchronization call from the DN gets lost
|
|
|
|
- // for the recovery timeout to expire and new recovery attempt
|
|
|
|
- // to be started.
|
|
|
|
- SleepAnswer delayer = new SleepAnswer(3000) {
|
|
|
|
- private final AtomicBoolean callRealMethod = new AtomicBoolean();
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
- boolean interrupted = false;
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(r.nextInt(3000) + 6000);
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- interrupted = true;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (callRealMethod.get()) {
|
|
|
|
- return invocation.callRealMethod();
|
|
|
|
- }
|
|
|
|
- callRealMethod.set(true);
|
|
|
|
- return null;
|
|
|
|
- } finally {
|
|
|
|
- if (interrupted) {
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
- testRecoveryWithDatanodeDelayed(delayer);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void testRecoveryWithDatanodeDelayed(
|
|
|
|
- GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
|
|
|
|
- Configuration configuration = new HdfsConfiguration();
|
|
|
|
- configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- cluster = new MiniDFSCluster.Builder(configuration)
|
|
|
|
- .numDataNodes(2).build();
|
|
|
|
- cluster.waitActive();
|
|
|
|
- final FSNamesystem ns = cluster.getNamesystem();
|
|
|
|
- final NameNode nn = cluster.getNameNode();
|
|
|
|
- final DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
|
- ns.getBlockManager().setBlockRecoveryTimeout(
|
|
|
|
- TimeUnit.SECONDS.toMillis(10));
|
|
|
|
-
|
|
|
|
- // Create a file and never close the output stream to trigger recovery
|
|
|
|
- FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
|
|
|
|
- (short) 2);
|
|
|
|
- out.write(AppendTestUtil.randomBytes(0, 4096));
|
|
|
|
- out.hsync();
|
|
|
|
-
|
|
|
|
- List<DataNode> dataNodes = cluster.getDataNodes();
|
|
|
|
- for (DataNode datanode : dataNodes) {
|
|
|
|
- DatanodeProtocolClientSideTranslatorPB nnSpy =
|
|
|
|
- InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
|
|
|
|
-
|
|
|
|
- Mockito.doAnswer(recoveryDelayer).when(nnSpy).
|
|
|
|
- commitBlockSynchronization(
|
|
|
|
- Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
|
|
|
|
- Mockito.anyLong(), Mockito.anyBoolean(),
|
|
|
|
- Mockito.anyBoolean(), Mockito.anyObject(),
|
|
|
|
- Mockito.anyObject());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Make sure hard lease expires to trigger replica recovery
|
|
|
|
- cluster.setLeasePeriod(100L, 100L);
|
|
|
|
-
|
|
|
|
- // Wait for recovery to succeed
|
|
|
|
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
- @Override
|
|
|
|
- public Boolean get() {
|
|
|
|
- return ns.getCompleteBlocksTotal() > 0;
|
|
|
|
- }
|
|
|
|
- }, 300, 300000);
|
|
|
|
-
|
|
|
|
- } finally {
|
|
|
|
- if (cluster != null) {
|
|
|
|
- cluster.shutdown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|