|
@@ -17,8 +17,17 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
-import static org.junit.Assert.assertSame;
|
|
|
-
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.net.InetAddress;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.Proxy;
|
|
|
+import java.net.Proxy.Type;
|
|
|
+import java.net.ServerSocket;
|
|
|
+import java.net.Socket;
|
|
|
+import java.net.SocketException;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -29,11 +38,61 @@ import org.junit.Assert;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.net.SocksSocketFactory;
|
|
|
import org.apache.hadoop.net.StandardSocketFactory;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertSame;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
|
|
|
+/**
|
|
|
+ * test StandardSocketFactory and SocksSocketFactory NetUtils
|
|
|
+ *
|
|
|
+ */
|
|
|
public class TestSocketFactory {
|
|
|
|
|
|
+ private static final int START_STOP_TIMEOUT_SEC = 30;
|
|
|
+
|
|
|
+ private ServerRunnable serverRunnable;
|
|
|
+ private Thread serverThread;
|
|
|
+ private int port;
|
|
|
+
|
|
|
+ private void startTestServer() throws Exception {
|
|
|
+ // start simple tcp server.
|
|
|
+ serverRunnable = new ServerRunnable();
|
|
|
+ serverThread = new Thread(serverRunnable);
|
|
|
+ serverThread.start();
|
|
|
+ final long timeout = System.currentTimeMillis() + START_STOP_TIMEOUT_SEC * 1000;
|
|
|
+ while (!serverRunnable.isReady()) {
|
|
|
+ assertNull(serverRunnable.getThrowable());
|
|
|
+ Thread.sleep(10);
|
|
|
+ if (System.currentTimeMillis() > timeout) {
|
|
|
+ fail("Server thread did not start properly in allowed time of "
|
|
|
+ + START_STOP_TIMEOUT_SEC + " sec.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ port = serverRunnable.getPort();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void stopTestServer() throws InterruptedException {
|
|
|
+ final Thread t = serverThread;
|
|
|
+ if (t != null) {
|
|
|
+ serverThread = null;
|
|
|
+ port = -1;
|
|
|
+ // stop server
|
|
|
+ serverRunnable.stop();
|
|
|
+ t.join(START_STOP_TIMEOUT_SEC * 1000);
|
|
|
+ assertFalse(t.isAlive());
|
|
|
+ assertNull(serverRunnable.getThrowable());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testSocketFactoryAsKeyInMap() {
|
|
|
Map<SocketFactory, Integer> dummyCache = new HashMap<SocketFactory, Integer>();
|
|
@@ -64,9 +123,145 @@ public class TestSocketFactory {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A dummy socket factory class that extends the StandardSocketFactory.
|
|
|
+ * A dummy socket factory class that extends the StandardSocketFactory.
|
|
|
*/
|
|
|
static class DummySocketFactory extends StandardSocketFactory {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test SocksSocketFactory.
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testSocksSocketFactory() throws Exception {
|
|
|
+ startTestServer();
|
|
|
+ testSocketFactory(new SocksSocketFactory());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test StandardSocketFactory.
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testStandardSocketFactory() throws Exception {
|
|
|
+ startTestServer();
|
|
|
+ testSocketFactory(new StandardSocketFactory());
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Common test implementation.
|
|
|
+ */
|
|
|
+ private void testSocketFactory(SocketFactory socketFactory) throws Exception {
|
|
|
+ assertNull(serverRunnable.getThrowable());
|
|
|
+
|
|
|
+ InetAddress address = InetAddress.getLocalHost();
|
|
|
+ Socket socket = socketFactory.createSocket(address, port);
|
|
|
+ checkSocket(socket);
|
|
|
+ socket.close();
|
|
|
+
|
|
|
+ socket = socketFactory.createSocket(address, port,
|
|
|
+ InetAddress.getLocalHost(), 0);
|
|
|
+ checkSocket(socket);
|
|
|
+ socket.close();
|
|
|
+
|
|
|
+ socket = socketFactory.createSocket("localhost", port);
|
|
|
+ checkSocket(socket);
|
|
|
+ socket.close();
|
|
|
+
|
|
|
+ socket = socketFactory.createSocket("localhost", port,
|
|
|
+ InetAddress.getLocalHost(), 0);
|
|
|
+ checkSocket(socket);
|
|
|
+ socket.close();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * test proxy methods
|
|
|
+ */
|
|
|
+ @Test (timeout=5000)
|
|
|
+ public void testProxy() throws Exception {
|
|
|
+ SocksSocketFactory templateWithoutProxy = new SocksSocketFactory();
|
|
|
+ Proxy proxy = new Proxy(Type.SOCKS, InetSocketAddress.createUnresolved(
|
|
|
+ "localhost", 0));
|
|
|
+
|
|
|
+ SocksSocketFactory templateWithProxy = new SocksSocketFactory(proxy);
|
|
|
+ assertFalse(templateWithoutProxy.equals(templateWithProxy));
|
|
|
+
|
|
|
+ Configuration configuration = new Configuration();
|
|
|
+ configuration.set("hadoop.socks.server", "localhost:0");
|
|
|
+
|
|
|
+ templateWithoutProxy.setConf(configuration);
|
|
|
+ assertTrue(templateWithoutProxy.equals(templateWithProxy));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkSocket(Socket socket) throws Exception {
|
|
|
+ BufferedReader input = new BufferedReader(new InputStreamReader(
|
|
|
+ socket.getInputStream()));
|
|
|
+ DataOutputStream out = new DataOutputStream(socket.getOutputStream());
|
|
|
+ out.writeBytes("test\n");
|
|
|
+ String answer = input.readLine();
|
|
|
+ assertEquals("TEST", answer);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Simple tcp server. Server gets a string, transforms it to upper case and returns it.
|
|
|
+ */
|
|
|
+ private static class ServerRunnable implements Runnable {
|
|
|
+
|
|
|
+ private volatile boolean works = true;
|
|
|
+ private ServerSocket testSocket;
|
|
|
+ private volatile boolean ready = false;
|
|
|
+ private volatile Throwable throwable;
|
|
|
+ private int port0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ testSocket = new ServerSocket(0);
|
|
|
+ port0 = testSocket.getLocalPort();
|
|
|
+ ready = true;
|
|
|
+ while (works) {
|
|
|
+ try {
|
|
|
+ Socket connectionSocket = testSocket.accept();
|
|
|
+ BufferedReader input = new BufferedReader(new InputStreamReader(
|
|
|
+ connectionSocket.getInputStream()));
|
|
|
+ DataOutputStream out = new DataOutputStream(
|
|
|
+ connectionSocket.getOutputStream());
|
|
|
+ String inData = input.readLine();
|
|
|
+
|
|
|
+ String outData = inData.toUpperCase() + "\n";
|
|
|
+ out.writeBytes(outData);
|
|
|
+ } catch (SocketException ignored) {
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ ioe.printStackTrace();
|
|
|
+ throwable = ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ works = false;
|
|
|
+ try {
|
|
|
+ testSocket.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isReady() {
|
|
|
+ return ready;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getPort() {
|
|
|
+ return port0;
|
|
|
+ }
|
|
|
|
|
|
+ public Throwable getThrowable() {
|
|
|
+ return throwable;
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
}
|