|
@@ -47,6 +47,7 @@ import java.net.Socket;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketException;
|
|
import java.net.SocketException;
|
|
import java.net.SocketTimeoutException;
|
|
import java.net.SocketTimeoutException;
|
|
|
|
+import java.net.UnknownHostException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -54,6 +55,7 @@ import java.util.List;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
|
+import java.util.concurrent.Callable;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -88,6 +90,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
|
+import org.apache.hadoop.test.LambdaTestUtils;
|
|
import org.apache.hadoop.test.Whitebox;
|
|
import org.apache.hadoop.test.Whitebox;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
@@ -789,6 +792,55 @@ public class TestIPC {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test(timeout=60000)
|
|
|
|
+ public void testIpcHostResolutionTimeout() throws Exception {
|
|
|
|
+ final InetSocketAddress addr = new InetSocketAddress("host.invalid", 80);
|
|
|
|
+
|
|
|
|
+ // start client
|
|
|
|
+ Client.setConnectTimeout(conf, 100);
|
|
|
|
+ final Client client = new Client(LongWritable.class, conf);
|
|
|
|
+ // set the rpc timeout to twice the MIN_SLEEP_TIME
|
|
|
|
+ try {
|
|
|
|
+ LambdaTestUtils.intercept(UnknownHostException.class,
|
|
|
|
+ new Callable<Void>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Void call() throws IOException {
|
|
|
|
+ TestIPC.this.call(client, new LongWritable(RANDOM.nextLong()),
|
|
|
|
+ addr, MIN_SLEEP_TIME * 2, conf);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ } finally {
|
|
|
|
+ client.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout=60000)
|
|
|
|
+ public void testIpcFlakyHostResolution() throws IOException {
|
|
|
|
+ // start server
|
|
|
|
+ Server server = new TestServer(5, false);
|
|
|
|
+ server.start();
|
|
|
|
+
|
|
|
|
+ // Leave host unresolved to start. Use "localhost" as opposed
|
|
|
|
+ // to local IP from NetUtils.getConnectAddress(server) to force
|
|
|
|
+ // resolution later
|
|
|
|
+ InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
|
|
|
|
+ "localhost", NetUtils.getConnectAddress(server).getPort());
|
|
|
|
+
|
|
|
|
+ // start client
|
|
|
|
+ Client.setConnectTimeout(conf, 100);
|
|
|
|
+ Client client = new Client(LongWritable.class, conf);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ // Should re-resolve host and succeed
|
|
|
|
+ call(client, new LongWritable(RANDOM.nextLong()), unresolvedAddr,
|
|
|
|
+ MIN_SLEEP_TIME * 2, conf);
|
|
|
|
+ } finally {
|
|
|
|
+ client.stop();
|
|
|
|
+ server.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Check that reader queueing works
|
|
* Check that reader queueing works
|
|
* @throws BrokenBarrierException
|
|
* @throws BrokenBarrierException
|