|
@@ -17,11 +17,14 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs;
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
|
|
+import static org.junit.Assert.*;
|
|
|
|
+
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -29,6 +32,8 @@ import org.mockito.Mockito;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.stubbing.Answer;
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
|
+
|
|
public class TestLeaseRenewer {
|
|
public class TestLeaseRenewer {
|
|
private String FAKE_AUTHORITY="hdfs://nn1/";
|
|
private String FAKE_AUTHORITY="hdfs://nn1/";
|
|
private UserGroupInformation FAKE_UGI_A =
|
|
private UserGroupInformation FAKE_UGI_A =
|
|
@@ -46,19 +51,24 @@ public class TestLeaseRenewer {
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setupMocksAndRenewer() throws IOException {
|
|
public void setupMocksAndRenewer() throws IOException {
|
|
- MOCK_DFSCLIENT = Mockito.mock(DFSClient.class);
|
|
|
|
- Mockito.doReturn(true)
|
|
|
|
- .when(MOCK_DFSCLIENT).isClientRunning();
|
|
|
|
- Mockito.doReturn((int)FAST_GRACE_PERIOD)
|
|
|
|
- .when(MOCK_DFSCLIENT).getHdfsTimeout();
|
|
|
|
- Mockito.doReturn("myclient")
|
|
|
|
- .when(MOCK_DFSCLIENT).getClientName();
|
|
|
|
|
|
+ MOCK_DFSCLIENT = createMockClient();
|
|
|
|
|
|
renewer = LeaseRenewer.getInstance(
|
|
renewer = LeaseRenewer.getInstance(
|
|
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
|
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
|
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
|
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private DFSClient createMockClient() {
|
|
|
|
+ DFSClient mock = Mockito.mock(DFSClient.class);
|
|
|
|
+ Mockito.doReturn(true)
|
|
|
|
+ .when(mock).isClientRunning();
|
|
|
|
+ Mockito.doReturn((int)FAST_GRACE_PERIOD)
|
|
|
|
+ .when(mock).getHdfsTimeout();
|
|
|
|
+ Mockito.doReturn("myclient")
|
|
|
|
+ .when(mock).getClientName();
|
|
|
|
+ return mock;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testInstanceSharing() throws IOException {
|
|
public void testInstanceSharing() throws IOException {
|
|
// Two lease renewers with the same UGI should return
|
|
// Two lease renewers with the same UGI should return
|
|
@@ -93,11 +103,11 @@ public class TestLeaseRenewer {
|
|
public void testRenewal() throws Exception {
|
|
public void testRenewal() throws Exception {
|
|
// Keep track of how many times the lease gets renewed
|
|
// Keep track of how many times the lease gets renewed
|
|
final AtomicInteger leaseRenewalCount = new AtomicInteger();
|
|
final AtomicInteger leaseRenewalCount = new AtomicInteger();
|
|
- Mockito.doAnswer(new Answer<Void>() {
|
|
|
|
|
|
+ Mockito.doAnswer(new Answer<Boolean>() {
|
|
@Override
|
|
@Override
|
|
- public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
|
|
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
|
leaseRenewalCount.incrementAndGet();
|
|
leaseRenewalCount.incrementAndGet();
|
|
- return null;
|
|
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
}).when(MOCK_DFSCLIENT).renewLease();
|
|
}).when(MOCK_DFSCLIENT).renewLease();
|
|
|
|
|
|
@@ -120,6 +130,57 @@ public class TestLeaseRenewer {
|
|
renewer.closeFile(filePath, MOCK_DFSCLIENT);
|
|
renewer.closeFile(filePath, MOCK_DFSCLIENT);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
|
|
|
|
+ * to several DFSClients with the same name, the first of which has no files
|
|
|
|
+ * open. Previously, this was causing the lease to not get renewed.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
|
|
|
+ // First DFSClient has no files open so doesn't renew leases.
|
|
|
|
+ final DFSClient mockClient1 = createMockClient();
|
|
|
|
+ Mockito.doReturn(false).when(mockClient1).renewLease();
|
|
|
|
+ assertSame(renewer, LeaseRenewer.getInstance(
|
|
|
|
+ FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
|
|
|
|
+
|
|
|
|
+ // Set up a file so that we start renewing our lease.
|
|
|
|
+ DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
|
|
|
|
+ String filePath = "/foo";
|
|
|
|
+ renewer.put(filePath, mockStream1, mockClient1);
|
|
|
|
+
|
|
|
|
+ // Second DFSClient does renew lease
|
|
|
|
+ final DFSClient mockClient2 = createMockClient();
|
|
|
|
+ Mockito.doReturn(true).when(mockClient2).renewLease();
|
|
|
|
+ assertSame(renewer, LeaseRenewer.getInstance(
|
|
|
|
+ FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
|
|
|
|
+
|
|
|
|
+ // Set up a file so that we start renewing our lease.
|
|
|
|
+ DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
|
|
|
|
+ renewer.put(filePath, mockStream2, mockClient2);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ // Wait for lease to get renewed
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ try {
|
|
|
|
+ Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
|
|
|
|
+ Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
|
|
|
|
+ return true;
|
|
|
|
+ } catch (AssertionError err) {
|
|
|
|
+ LeaseRenewer.LOG.warn("Not yet satisfied", err);
|
|
|
|
+ return false;
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ // should not throw!
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }, 100, 10000);
|
|
|
|
+
|
|
|
|
+ renewer.closeFile(filePath, mockClient1);
|
|
|
|
+ renewer.closeFile(filePath, mockClient2);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testThreadName() throws Exception {
|
|
public void testThreadName() throws Exception {
|
|
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
|
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|