|
@@ -31,6 +31,7 @@ import static org.mockito.Matchers.anyString;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.timeout;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
@@ -62,6 +63,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
@@ -354,7 +356,59 @@ public class TestDFSClientRetries {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test DFSClient can continue to function after renewLease RPC
|
|
|
+ * receives SocketTimeoutException.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testLeaseRenewSocketTimeout() throws Exception
|
|
|
+ {
|
|
|
+ String file1 = "/testFile1";
|
|
|
+ String file2 = "/testFile2";
|
|
|
+ // Set short retry timeouts so this test runs faster
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
|
|
|
+ conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000);
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
|
|
|
+ Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
|
|
|
+ Mockito.anyString());
|
|
|
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
|
|
|
+ // Get hold of the lease renewer instance used by the client
|
|
|
+ LeaseRenewer leaseRenewer = client.getLeaseRenewer();
|
|
|
+ leaseRenewer.setRenewalTime(100);
|
|
|
+ OutputStream out1 = client.create(file1, false);
|
|
|
+
|
|
|
+ Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
|
|
|
+ Mockito.anyString());
|
|
|
+ verifyEmptyLease(leaseRenewer);
|
|
|
+ try {
|
|
|
+ out1.write(new byte[256]);
|
|
|
+ fail("existing output stream should be aborted");
|
|
|
+ } catch (IOException e) {
|
|
|
+ }
|
|
|
+
|
|
|
+ // Verify DFSClient can do read operation after renewLease aborted.
|
|
|
+ client.exists(file2);
|
|
|
+ // Verify DFSClient can do write operation after renewLease no longer
|
|
|
+ // throws SocketTimeoutException.
|
|
|
+ Mockito.doNothing().when(spyNN).renewLease(
|
|
|
+ Mockito.anyString());
|
|
|
+ leaseRenewer = client.getLeaseRenewer();
|
|
|
+ leaseRenewer.setRenewalTime(100);
|
|
|
+ OutputStream out2 = client.create(file2, false);
|
|
|
+ Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
|
|
|
+ Mockito.anyString());
|
|
|
+ out2.write(new byte[256]);
|
|
|
+ out2.close();
|
|
|
+ verifyEmptyLease(leaseRenewer);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test that getAdditionalBlock() and close() are idempotent. This allows
|
|
|
* a client to safely retry a call and still produce a correct
|
|
@@ -668,7 +722,15 @@ public class TestDFSClientRetries {
|
|
|
}
|
|
|
return ret;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception {
|
|
|
+ int sleepCount = 0;
|
|
|
+ while (!leaseRenewer.isEmpty() && sleepCount++ < 20) {
|
|
|
+ Thread.sleep(500);
|
|
|
+ }
|
|
|
+ assertTrue("Lease should be empty.", leaseRenewer.isEmpty());
|
|
|
+ }
|
|
|
+
|
|
|
class DFSClientReader implements Runnable {
|
|
|
|
|
|
DFSClient client;
|