|
@@ -32,6 +32,8 @@ import static org.mockito.ArgumentMatchers.anyList;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
|
import static org.mockito.ArgumentMatchers.anyLong;
|
|
|
+import static org.mockito.ArgumentMatchers.anyString;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
@@ -42,6 +44,7 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.util.ArrayList;
|
|
@@ -96,6 +99,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
@@ -703,7 +707,67 @@ public class TestBlockRecovery {
|
|
|
streams.close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testEcRecoverBlocks() throws Throwable {
|
|
|
+ // Stop the Mocked DN started in startup()
|
|
|
+ tearDown();
|
|
|
+ ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
|
|
|
+ NamenodeProtocols spyNN = spy(preSpyNN);
|
|
|
+
|
|
|
+ // Delay completeFile
|
|
|
+ GenericTestUtils.DelayAnswer delayer =
|
|
|
+ new GenericTestUtils.DelayAnswer(LOG);
|
|
|
+ doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(),
|
|
|
+ anyLong());
|
|
|
+ String topDir = "/myDir";
|
|
|
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
|
|
|
+ Path file = new Path(topDir + "/testECLeaseRecover");
|
|
|
+ client.mkdirs(topDir, null, false);
|
|
|
+ client.enableErasureCodingPolicy(ecPolicy.getName());
|
|
|
+ client.setErasureCodingPolicy(topDir, ecPolicy.getName());
|
|
|
+ OutputStream stm = client.create(file.toString(), true);
|
|
|
+
|
|
|
+ // write 5MB File
|
|
|
+ AppendTestUtil.write(stm, 0, 1024 * 1024 * 5);
|
|
|
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
|
|
|
+ Thread t = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ stm.close();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ err.set(t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ t.start();
|
|
|
+
|
|
|
+ // Waiting for close to get to latch
|
|
|
+ delayer.waitForCall();
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ try {
|
|
|
+ return client.getNamenode().recoverLease(file.toString(),
|
|
|
+ client.getClientName());
|
|
|
+ } catch (IOException e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 5000, 24000);
|
|
|
+ delayer.proceed();
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test to verify the race between finalizeBlock and Lease recovery
|
|
|
*
|