|
@@ -18,19 +18,28 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
|
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertNotNull;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-import static org.mockito.ArgumentMatchers.any;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertSame;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
+import static org.junit.jupiter.api.Assertions.fail;
|
|
|
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
|
|
+import static org.mockito.Mockito.any;
|
|
|
+import static org.mockito.Mockito.anyInt;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.timeout;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Mockito.reset;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.DataInput;
|
|
@@ -94,12 +103,9 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
-import org.assertj.core.api.Condition;
|
|
|
-import org.junit.Assert;
|
|
|
-import org.junit.Assume;
|
|
|
-import org.junit.Before;
|
|
|
-import org.junit.Test;
|
|
|
-import org.mockito.Mockito;
|
|
|
+import org.junit.jupiter.api.BeforeEach;
|
|
|
+import org.junit.jupiter.api.Test;
|
|
|
+import org.junit.jupiter.api.Timeout;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
@@ -110,8 +116,6 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.slf4j.event.Level;
|
|
|
|
|
|
-import static org.assertj.core.api.Assertions.assertThat;
|
|
|
-
|
|
|
/** Unit tests for IPC. */
|
|
|
public class TestIPC {
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(TestIPC.class);
|
|
@@ -126,7 +130,7 @@ public class TestIPC {
|
|
|
static boolean WRITABLE_FAULTS_ENABLED = true;
|
|
|
static int WRITABLE_FAULTS_SLEEP = 0;
|
|
|
|
|
|
- @Before
|
|
|
+ @BeforeEach
|
|
|
public void setupConf() {
|
|
|
conf = new Configuration();
|
|
|
Client.setPingInterval(conf, PING_INTERVAL);
|
|
@@ -339,7 +343,8 @@ public class TestIPC {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testSerial() throws IOException, InterruptedException {
|
|
|
internalTestSerial(3, false, 2, 5, 100);
|
|
|
internalTestSerial(3, true, 2, 5, 10);
|
|
@@ -403,7 +408,8 @@ public class TestIPC {
|
|
|
server.stop();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testStandAloneClient() throws IOException {
|
|
|
Client client = new Client(LongWritable.class, conf);
|
|
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
|
@@ -413,13 +419,13 @@ public class TestIPC {
|
|
|
} catch (IOException e) {
|
|
|
String message = e.getMessage();
|
|
|
String addressText = address.getHostName() + ":" + address.getPort();
|
|
|
- assertTrue("Did not find "+addressText+" in "+message,
|
|
|
- message.contains(addressText));
|
|
|
+ assertTrue(message.contains(addressText),
|
|
|
+ "Did not find "+addressText+" in "+message);
|
|
|
Throwable cause=e.getCause();
|
|
|
- assertNotNull("No nested exception in "+e,cause);
|
|
|
+ assertNotNull(cause, "No nested exception in "+e);
|
|
|
String causeText=cause.getMessage();
|
|
|
- assertTrue("Did not find " + causeText + " in " + message,
|
|
|
- message.contains(causeText));
|
|
|
+ assertTrue(message.contains(causeText),
|
|
|
+ "Did not find " + causeText + " in " + message);
|
|
|
} finally {
|
|
|
client.stop();
|
|
|
}
|
|
@@ -539,7 +545,8 @@ public class TestIPC {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIOEOnClientWriteParam() throws Exception {
|
|
|
doErrorTest(IOEOnWriteWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -547,7 +554,8 @@ public class TestIPC {
|
|
|
LongWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testRTEOnClientWriteParam() throws Exception {
|
|
|
doErrorTest(RTEOnWriteWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -555,7 +563,8 @@ public class TestIPC {
|
|
|
LongWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIOEOnServerReadParam() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
IOEOnReadWritable.class,
|
|
@@ -563,7 +572,8 @@ public class TestIPC {
|
|
|
LongWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testRTEOnServerReadParam() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
RTEOnReadWritable.class,
|
|
@@ -572,7 +582,8 @@ public class TestIPC {
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIOEOnServerWriteResponse() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -580,7 +591,8 @@ public class TestIPC {
|
|
|
LongWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testRTEOnServerWriteResponse() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -588,7 +600,8 @@ public class TestIPC {
|
|
|
LongWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIOEOnClientReadResponse() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -596,7 +609,8 @@ public class TestIPC {
|
|
|
IOEOnReadWritable.class);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testRTEOnClientReadResponse() throws Exception {
|
|
|
doErrorTest(LongWritable.class,
|
|
|
LongWritable.class,
|
|
@@ -609,7 +623,8 @@ public class TestIPC {
|
|
|
* that a ping should have been sent. This is a reproducer for a
|
|
|
* deadlock seen in one iteration of HADOOP-6762.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIOEOnWriteAfterPingClient() throws Exception {
|
|
|
// start server
|
|
|
Client.setPingInterval(conf, 100);
|
|
@@ -628,8 +643,8 @@ public class TestIPC {
|
|
|
private static void assertExceptionContains(
|
|
|
Throwable t, String substring) {
|
|
|
String msg = StringUtils.stringifyException(t);
|
|
|
- assertTrue("Exception should contain substring '" + substring + "':\n" +
|
|
|
- msg, msg.contains(substring));
|
|
|
+ assertTrue(msg.contains(substring),
|
|
|
+ "Exception should contain substring '" + substring + "':\n" + msg);
|
|
|
LOG.info("Got expected exception", t);
|
|
|
}
|
|
|
|
|
@@ -637,7 +652,8 @@ public class TestIPC {
|
|
|
* Test that, if the socket factory throws an IOE, it properly propagates
|
|
|
* to the client.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testSocketFactoryException() throws IOException {
|
|
|
SocketFactory mockFactory = mock(SocketFactory.class);
|
|
|
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
|
|
@@ -670,12 +686,13 @@ public class TestIPC {
|
|
|
* failure is handled properly. This is a regression test for
|
|
|
* HADOOP-7428.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testRTEDuringConnectionSetup() throws IOException {
|
|
|
// Set up a socket factory which returns sockets which
|
|
|
// throw an RTE when setSoTimeout is called.
|
|
|
SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
|
|
|
- Mockito.doAnswer(new Answer<Socket>() {
|
|
|
+ doAnswer(new Answer<Socket>() {
|
|
|
@Override
|
|
|
public Socket answer(InvocationOnMock invocation) {
|
|
|
return new MockSocket();
|
|
@@ -699,7 +716,7 @@ public class TestIPC {
|
|
|
// Resetting to the normal socket behavior should succeed
|
|
|
// (i.e. it should not have cached a half-constructed connection)
|
|
|
|
|
|
- Mockito.reset(spyFactory);
|
|
|
+ reset(spyFactory);
|
|
|
call(client, RANDOM.nextLong(), address, conf);
|
|
|
} finally {
|
|
|
client.stop();
|
|
@@ -707,7 +724,8 @@ public class TestIPC {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcTimeout() throws IOException {
|
|
|
// start server
|
|
|
Server server = new TestServer(1, true);
|
|
@@ -730,7 +748,8 @@ public class TestIPC {
|
|
|
client.stop();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcConnectTimeout() throws IOException {
|
|
|
// start server
|
|
|
Server server = new TestServer(1, true);
|
|
@@ -754,7 +773,8 @@ public class TestIPC {
|
|
|
/**
|
|
|
* Check service class byte in IPC header is correct on wire.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcWithServiceClass() throws IOException {
|
|
|
// start server
|
|
|
Server server = new TestServer(5, false);
|
|
@@ -800,7 +820,8 @@ public class TestIPC {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcHostResolutionTimeout() throws Exception {
|
|
|
final InetSocketAddress addr = new InetSocketAddress("host.invalid", 80);
|
|
|
|
|
@@ -898,7 +919,8 @@ public class TestIPC {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcFlakyHostResolution() throws IOException {
|
|
|
// start server
|
|
|
Server server = new TestServer(5, false);
|
|
@@ -929,7 +951,8 @@ public class TestIPC {
|
|
|
* @throws BrokenBarrierException
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcWithReaderQueuing() throws Exception {
|
|
|
// 1 reader, 1 connectionQ slot, 1 callq
|
|
|
for (int i=0; i < 10; i++) {
|
|
@@ -1058,7 +1081,8 @@ public class TestIPC {
|
|
|
server.stop();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=30000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 30)
|
|
|
public void testConnectionIdleTimeouts() throws Exception {
|
|
|
GenericTestUtils.setLogLevel(Server.LOG, Level.DEBUG);
|
|
|
final int maxIdle = 1000;
|
|
@@ -1176,37 +1200,40 @@ public class TestIPC {
|
|
|
call(client, addr, serviceClass, conf);
|
|
|
Connection connection = server.getConnections()[0];
|
|
|
LOG.info("Connection is from: {}", connection);
|
|
|
- assertEquals(
|
|
|
- "Connection string representation should include only IP address for healthy connection", 1,
|
|
|
- connection.toString().split(" / ").length);
|
|
|
+ assertEquals(1, connection.toString().split(" / ").length,
|
|
|
+ "Connection string representation should include only IP address for healthy connection");
|
|
|
int serviceClass2 = connection.getServiceClass();
|
|
|
assertFalse(noChanged ^ serviceClass == serviceClass2);
|
|
|
client.stop();
|
|
|
}
|
|
|
-
|
|
|
- @Test(timeout=30000, expected=IOException.class)
|
|
|
+
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 30)
|
|
|
public void testIpcAfterStopping() throws IOException {
|
|
|
- // start server
|
|
|
- Server server = new TestServer(5, false);
|
|
|
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
- server.start();
|
|
|
+ assertThrows(IOException.class, () -> {
|
|
|
+ // start server
|
|
|
+ Server server = new TestServer(5, false);
|
|
|
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
+ server.start();
|
|
|
|
|
|
- // start client
|
|
|
- Client client = new Client(LongWritable.class, conf);
|
|
|
- call(client, addr, 0, conf);
|
|
|
- client.stop();
|
|
|
-
|
|
|
- // This call should throw IOException.
|
|
|
- call(client, addr, 0, conf);
|
|
|
+ // start client
|
|
|
+ Client client = new Client(LongWritable.class, conf);
|
|
|
+ call(client, addr, 0, conf);
|
|
|
+ client.stop();
|
|
|
+
|
|
|
+ // This call should throw IOException.
|
|
|
+ call(client, addr, 0, conf);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Check that file descriptors aren't leaked by starting
|
|
|
* and stopping IPC servers.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testSocketLeak() throws IOException {
|
|
|
- Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
|
|
|
+ assumeTrue(FD_DIR.exists()); // only run on Linux
|
|
|
|
|
|
long startFds = countOpenFileDescriptors();
|
|
|
for (int i = 0; i < 50; i++) {
|
|
@@ -1216,15 +1243,16 @@ public class TestIPC {
|
|
|
}
|
|
|
long endFds = countOpenFileDescriptors();
|
|
|
|
|
|
- assertTrue("Leaked " + (endFds - startFds) + " file descriptors",
|
|
|
- endFds - startFds < 20);
|
|
|
+ assertTrue(endFds - startFds < 20,
|
|
|
+ "Leaked " + (endFds - startFds) + " file descriptors");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Check if Client is interrupted after handling
|
|
|
* InterruptedException during cleanup
|
|
|
*/
|
|
|
- @Test(timeout=30000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 30)
|
|
|
public void testInterrupted() {
|
|
|
Client client = new Client(LongWritable.class, conf);
|
|
|
Thread.currentThread().interrupt();
|
|
@@ -1234,7 +1262,7 @@ public class TestIPC {
|
|
|
LOG.info("Expected thread interrupt during client cleanup");
|
|
|
} catch (AssertionError e) {
|
|
|
LOG.error("The Client did not interrupt after handling an Interrupted Exception");
|
|
|
- Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
|
|
|
+ fail("The Client did not interrupt after handling an Interrupted Exception");
|
|
|
}
|
|
|
// Clear Thread interrupt
|
|
|
Thread.interrupted();
|
|
@@ -1244,31 +1272,36 @@ public class TestIPC {
|
|
|
return FD_DIR.list().length;
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcFromHadoop_0_18_13() throws IOException {
|
|
|
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
|
|
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcFromHadoop0_20_3() throws IOException {
|
|
|
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
|
|
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testIpcFromHadoop0_21_0() throws IOException {
|
|
|
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
|
|
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testHttpGetResponse() throws IOException {
|
|
|
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
|
|
|
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
|
|
|
Configuration conf = new Configuration();
|
|
|
// set max retries to 0
|
|
@@ -1294,7 +1327,8 @@ public class TestIPC {
|
|
|
* (1) the rpc server uses the call id/retry provided by the rpc client, and
|
|
|
* (2) the rpc client receives the same call id/retry from the rpc server.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testCallIdAndRetry() throws IOException {
|
|
|
final CallInfo info = new CallInfo();
|
|
|
|
|
@@ -1311,8 +1345,8 @@ public class TestIPC {
|
|
|
@Override
|
|
|
void checkResponse(RpcResponseHeaderProto header) throws IOException {
|
|
|
super.checkResponse(header);
|
|
|
- Assert.assertEquals(info.id, header.getCallId());
|
|
|
- Assert.assertEquals(info.retry, header.getRetryCount());
|
|
|
+ assertEquals(info.id, header.getCallId());
|
|
|
+ assertEquals(info.retry, header.getRetryCount());
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -1321,8 +1355,8 @@ public class TestIPC {
|
|
|
server.callListener = new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- Assert.assertEquals(info.id, Server.getCallId());
|
|
|
- Assert.assertEquals(info.retry, Server.getCallRetryCount());
|
|
|
+ assertEquals(info.id, Server.getCallId());
|
|
|
+ assertEquals(info.retry, Server.getCallRetryCount());
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -1343,11 +1377,12 @@ public class TestIPC {
|
|
|
* caller is notified.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testReceiveStateBeforeCallerNotification() throws IOException {
|
|
|
AtomicBoolean stateReceived = new AtomicBoolean(false);
|
|
|
- AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class);
|
|
|
- Mockito.doAnswer((Answer<Void>) invocation -> {
|
|
|
+ AlignmentContext alignmentContext = mock(AlignmentContext.class);
|
|
|
+ doAnswer((Answer<Void>) invocation -> {
|
|
|
Thread.sleep(1000);
|
|
|
stateReceived.set(true);
|
|
|
return null;
|
|
@@ -1362,7 +1397,7 @@ public class TestIPC {
|
|
|
server.start();
|
|
|
call(client, new LongWritable(RANDOM.nextLong()), addr,
|
|
|
0, conf, alignmentContext);
|
|
|
- Assert.assertTrue(stateReceived.get());
|
|
|
+ assertTrue(stateReceived.get());
|
|
|
} finally {
|
|
|
client.stop();
|
|
|
server.stop();
|
|
@@ -1378,7 +1413,8 @@ public class TestIPC {
|
|
|
/**
|
|
|
* Test the retry count while used in a retry proxy.
|
|
|
*/
|
|
|
- @Test(timeout=100000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 100)
|
|
|
public void testRetryProxy() throws IOException {
|
|
|
final Client client = new Client(LongWritable.class, conf);
|
|
|
|
|
@@ -1387,7 +1423,7 @@ public class TestIPC {
|
|
|
private int retryCount = 0;
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- Assert.assertEquals(retryCount++, Server.getCallRetryCount());
|
|
|
+ assertEquals(retryCount++, Server.getCallRetryCount());
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -1404,7 +1440,7 @@ public class TestIPC {
|
|
|
try {
|
|
|
server.start();
|
|
|
retryProxy.dummyRun();
|
|
|
- Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
|
|
|
+ assertEquals(TestInvocationHandler.retry, totalRetry + 1);
|
|
|
} finally {
|
|
|
Client.setCallIdAndRetryCount(0, 0, null);
|
|
|
client.stop();
|
|
@@ -1416,39 +1452,41 @@ public class TestIPC {
|
|
|
* Test that there is no retry when invalid token exception is thrown.
|
|
|
* Verfies fix for HADOOP-12054
|
|
|
*/
|
|
|
- @Test(expected = InvalidToken.class)
|
|
|
+ @Test
|
|
|
public void testNoRetryOnInvalidToken() throws IOException {
|
|
|
- final Client client = new Client(LongWritable.class, conf);
|
|
|
- final TestServer server = new TestServer(1, false);
|
|
|
- TestInvalidTokenHandler handler =
|
|
|
- new TestInvalidTokenHandler(client, server);
|
|
|
- DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
|
|
|
- DummyProtocol.class.getClassLoader(),
|
|
|
- new Class[] { DummyProtocol.class }, handler);
|
|
|
- FailoverProxyProvider<DummyProtocol> provider =
|
|
|
- new DefaultFailoverProxyProvider<DummyProtocol>(
|
|
|
- DummyProtocol.class, proxy);
|
|
|
- DummyProtocol retryProxy =
|
|
|
- (DummyProtocol) RetryProxy.create(DummyProtocol.class, provider,
|
|
|
- RetryPolicies.failoverOnNetworkException(
|
|
|
- RetryPolicies.TRY_ONCE_THEN_FAIL, 100, 100, 10000, 0));
|
|
|
+ assertThrows(InvalidToken.class, () -> {
|
|
|
+ final Client client = new Client(LongWritable.class, conf);
|
|
|
+ final TestServer server = new TestServer(1, false);
|
|
|
+ TestInvalidTokenHandler handler =
|
|
|
+ new TestInvalidTokenHandler(client, server);
|
|
|
+ DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
|
|
|
+ DummyProtocol.class.getClassLoader(),
|
|
|
+ new Class[]{DummyProtocol.class}, handler);
|
|
|
+ FailoverProxyProvider<DummyProtocol> provider =
|
|
|
+ new DefaultFailoverProxyProvider<>(DummyProtocol.class, proxy);
|
|
|
+ DummyProtocol retryProxy =
|
|
|
+ (DummyProtocol) RetryProxy.create(DummyProtocol.class, provider,
|
|
|
+ RetryPolicies.failoverOnNetworkException(
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, 100, 100, 10000, 0));
|
|
|
|
|
|
- try {
|
|
|
- server.start();
|
|
|
- retryProxy.dummyRun();
|
|
|
- } finally {
|
|
|
- // Check if dummyRun called only once
|
|
|
- assertThat(handler.invocations).isOne();
|
|
|
- Client.setCallIdAndRetryCount(0, 0, null);
|
|
|
- client.stop();
|
|
|
- server.stop();
|
|
|
- }
|
|
|
+ try {
|
|
|
+ server.start();
|
|
|
+ retryProxy.dummyRun();
|
|
|
+ } finally {
|
|
|
+ // Check if dummyRun called only once
|
|
|
+ assertThat(handler.invocations).isOne();
|
|
|
+ Client.setCallIdAndRetryCount(0, 0, null);
|
|
|
+ client.stop();
|
|
|
+ server.stop();
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Test if the rpc server gets the default retry count (0) from client.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testInitialCallRetryCount() throws IOException {
|
|
|
// Override client to store the call id
|
|
|
final Client client = new Client(LongWritable.class, conf);
|
|
@@ -1460,7 +1498,7 @@ public class TestIPC {
|
|
|
public void run() {
|
|
|
// we have not set the retry count for the client, thus on the server
|
|
|
// side we should see retry count as 0
|
|
|
- Assert.assertEquals(0, Server.getCallRetryCount());
|
|
|
+ assertEquals(0, Server.getCallRetryCount());
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -1479,7 +1517,8 @@ public class TestIPC {
|
|
|
/**
|
|
|
* Test if the rpc server gets the retry count from client.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testCallRetryCount() throws IOException {
|
|
|
final int retryCount = 255;
|
|
|
// Override client to store the call id
|
|
@@ -1493,7 +1532,7 @@ public class TestIPC {
|
|
|
public void run() {
|
|
|
// we have not set the retry count for the client, thus on the server
|
|
|
// side we should see retry count as 0
|
|
|
- Assert.assertEquals(retryCount, Server.getCallRetryCount());
|
|
|
+ assertEquals(retryCount, Server.getCallRetryCount());
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -1514,7 +1553,8 @@ public class TestIPC {
|
|
|
* even if multiple threads are using the same client.
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testUniqueSequentialCallIds()
|
|
|
throws IOException, InterruptedException {
|
|
|
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
|
|
@@ -1623,10 +1663,11 @@ public class TestIPC {
|
|
|
assertThat(Client.getTimeout(config)).isEqualTo(-1);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testSetupConnectionShouldNotBlockShutdown() throws Exception {
|
|
|
// Start server
|
|
|
- SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
|
|
+ SocketFactory mockFactory = mock(SocketFactory.class);
|
|
|
Server server = new TestServer(1, true);
|
|
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
|
|
@@ -1669,7 +1710,7 @@ public class TestIPC {
|
|
|
|
|
|
private void assertRetriesOnSocketTimeouts(Configuration conf,
|
|
|
int maxTimeoutRetries) throws IOException {
|
|
|
- SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
|
|
+ SocketFactory mockFactory = mock(SocketFactory.class);
|
|
|
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
|
|
|
Client client = new Client(LongWritable.class, conf, mockFactory);
|
|
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
|
|
@@ -1677,18 +1718,20 @@ public class TestIPC {
|
|
|
call(client, RANDOM.nextLong(), address, conf);
|
|
|
fail("Not throwing the SocketTimeoutException");
|
|
|
} catch (SocketTimeoutException e) {
|
|
|
- Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
|
|
+ verify(mockFactory, times(maxTimeoutRetries))
|
|
|
.createSocket();
|
|
|
}
|
|
|
client.stop();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=4000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 4)
|
|
|
public void testInsecureVersionMismatch() throws IOException {
|
|
|
checkVersionMismatch();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=4000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 4)
|
|
|
public void testSecureVersionMismatch() throws IOException {
|
|
|
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
@@ -1722,13 +1765,13 @@ public class TestIPC {
|
|
|
Client client = new Client(LongWritable.class, conf);
|
|
|
call(client, 0, addr, conf);
|
|
|
} catch (RemoteException re) {
|
|
|
- Assert.assertEquals(RPC.VersionMismatch.class.getName(),
|
|
|
+ assertEquals(RPC.VersionMismatch.class.getName(),
|
|
|
re.getClassName());
|
|
|
- Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
|
|
|
+ assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
|
|
|
re.getMessage());
|
|
|
return;
|
|
|
}
|
|
|
- Assert.fail("didn't get version mismatch");
|
|
|
+ fail("didn't get version mismatch");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1747,13 +1790,13 @@ public class TestIPC {
|
|
|
try {
|
|
|
call(client, 0, addr, conf);
|
|
|
} catch (IOException ioe) {
|
|
|
- Assert.assertNotNull(ioe);
|
|
|
- Assert.assertEquals(RpcException.class, ioe.getClass());
|
|
|
- Assert.assertTrue(ioe.getMessage().contains(
|
|
|
+ assertNotNull(ioe);
|
|
|
+ assertEquals(RpcException.class, ioe.getClass());
|
|
|
+ assertTrue(ioe.getMessage().contains(
|
|
|
"exceeds maximum data length"));
|
|
|
return;
|
|
|
}
|
|
|
- Assert.fail("didn't get limit exceeded");
|
|
|
+ fail("didn't get limit exceeded");
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1766,13 +1809,14 @@ public class TestIPC {
|
|
|
checkUserBinding(true);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testUpdateAddressEnsureResolved() throws Exception {
|
|
|
// start server
|
|
|
Server server = new TestServer(1, false);
|
|
|
server.start();
|
|
|
|
|
|
- SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
|
|
+ SocketFactory mockFactory = mock(SocketFactory.class);
|
|
|
doThrow(new ConnectTimeoutException("fake")).when(mockFactory)
|
|
|
.createSocket();
|
|
|
Client client = new Client(LongWritable.class, conf, mockFactory);
|
|
@@ -1811,16 +1855,16 @@ public class TestIPC {
|
|
|
Socket s;
|
|
|
// don't attempt bind with no service host.
|
|
|
s = checkConnect(null, asProxy);
|
|
|
- Mockito.verify(s, Mockito.never()).bind(any(SocketAddress.class));
|
|
|
+ verify(s, never()).bind(any(SocketAddress.class));
|
|
|
|
|
|
// don't attempt bind with service host not belonging to this host.
|
|
|
s = checkConnect("1.2.3.4", asProxy);
|
|
|
- Mockito.verify(s, Mockito.never()).bind(any(SocketAddress.class));
|
|
|
+ verify(s, never()).bind(any(SocketAddress.class));
|
|
|
|
|
|
// do attempt bind when service host is this host.
|
|
|
InetAddress addr = InetAddress.getLocalHost();
|
|
|
s = checkConnect(addr.getHostAddress(), asProxy);
|
|
|
- Mockito.verify(s).bind(new InetSocketAddress(addr, 0));
|
|
|
+ verify(s).bind(new InetSocketAddress(addr, 0));
|
|
|
}
|
|
|
|
|
|
// dummy protocol that claims to support kerberos.
|
|
@@ -1838,7 +1882,7 @@ public class TestIPC {
|
|
|
principal.append("@REALM");
|
|
|
UserGroupInformation ugi =
|
|
|
spy(UserGroupInformation.createRemoteUser(principal.toString()));
|
|
|
- Mockito.doReturn(true).when(ugi).hasKerberosCredentials();
|
|
|
+ doReturn(true).when(ugi).hasKerberosCredentials();
|
|
|
if (asProxy) {
|
|
|
ugi = UserGroupInformation.createProxyUser("proxy", ugi);
|
|
|
}
|
|
@@ -1846,11 +1890,11 @@ public class TestIPC {
|
|
|
// create a mock socket that throws on connect.
|
|
|
SocketException expectedConnectEx =
|
|
|
new SocketException("Expected connect failure");
|
|
|
- Socket s = Mockito.mock(Socket.class);
|
|
|
- SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
|
|
- Mockito.doReturn(s).when(mockFactory).createSocket();
|
|
|
+ Socket s = mock(Socket.class);
|
|
|
+ SocketFactory mockFactory = mock(SocketFactory.class);
|
|
|
+ doReturn(s).when(mockFactory).createSocket();
|
|
|
doThrow(expectedConnectEx).when(s).connect(
|
|
|
- any(SocketAddress.class), Mockito.anyInt());
|
|
|
+ any(SocketAddress.class), anyInt());
|
|
|
|
|
|
// do a dummy call and expect it to throw an exception on connect.
|
|
|
// tests should verify if/how a bind occurred.
|
|
@@ -1864,7 +1908,7 @@ public class TestIPC {
|
|
|
fail("call didn't throw connect exception");
|
|
|
} catch (SocketException se) {
|
|
|
// ipc layer re-wraps exceptions, so check the cause.
|
|
|
- Assert.assertSame(expectedConnectEx, se.getCause());
|
|
|
+ assertSame(expectedConnectEx, se.getCause());
|
|
|
}
|
|
|
return s;
|
|
|
}
|