|
@@ -22,124 +22,149 @@ import java.net.InetSocketAddress;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
import java.net.SocketAddress;
|
|
import java.net.SocketAddress;
|
|
|
|
|
|
-import junit.framework.TestCase;
|
|
|
|
-
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
|
-import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobStatus;
|
|
import org.apache.hadoop.mapred.JobStatus;
|
|
-import org.apache.hadoop.mapred.MiniMRCluster;
|
|
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
|
import org.apache.hadoop.net.StandardSocketFactory;
|
|
import org.apache.hadoop.net.StandardSocketFactory;
|
|
-import org.junit.Ignore;
|
|
|
|
|
|
+import org.junit.Assert;
|
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This class checks that RPCs can use specialized socket factories.
|
|
* This class checks that RPCs can use specialized socket factories.
|
|
*/
|
|
*/
|
|
-@Ignore
|
|
|
|
-public class TestSocketFactory extends TestCase {
|
|
|
|
|
|
+public class TestSocketFactory {
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Check that we can reach a NameNode or a JobTracker using a specific
|
|
|
|
|
|
+ * Check that we can reach a NameNode or Resource Manager using a specific
|
|
* socket factory
|
|
* socket factory
|
|
*/
|
|
*/
|
|
|
|
+ @Test
|
|
public void testSocketFactory() throws IOException {
|
|
public void testSocketFactory() throws IOException {
|
|
// Create a standard mini-cluster
|
|
// Create a standard mini-cluster
|
|
Configuration sconf = new Configuration();
|
|
Configuration sconf = new Configuration();
|
|
- MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
|
|
|
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1)
|
|
|
|
+ .build();
|
|
final int nameNodePort = cluster.getNameNodePort();
|
|
final int nameNodePort = cluster.getNameNodePort();
|
|
|
|
|
|
// Get a reference to its DFS directly
|
|
// Get a reference to its DFS directly
|
|
FileSystem fs = cluster.getFileSystem();
|
|
FileSystem fs = cluster.getFileSystem();
|
|
- assertTrue(fs instanceof DistributedFileSystem);
|
|
|
|
|
|
+ Assert.assertTrue(fs instanceof DistributedFileSystem);
|
|
DistributedFileSystem directDfs = (DistributedFileSystem) fs;
|
|
DistributedFileSystem directDfs = (DistributedFileSystem) fs;
|
|
|
|
|
|
- // Get another reference via network using a specific socket factory
|
|
|
|
- Configuration cconf = new Configuration();
|
|
|
|
- FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
|
|
|
|
- nameNodePort + 10));
|
|
|
|
- cconf.set("hadoop.rpc.socket.factory.class.default",
|
|
|
|
- "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
- cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
|
|
|
|
- "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
- cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
|
|
|
|
- "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
|
|
+ Configuration cconf = getCustomSocketConfigs(nameNodePort);
|
|
|
|
|
|
fs = FileSystem.get(cconf);
|
|
fs = FileSystem.get(cconf);
|
|
- assertTrue(fs instanceof DistributedFileSystem);
|
|
|
|
|
|
+ Assert.assertTrue(fs instanceof DistributedFileSystem);
|
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
|
|
|
|
|
JobClient client = null;
|
|
JobClient client = null;
|
|
- MiniMRCluster mr = null;
|
|
|
|
|
|
+ MiniMRYarnCluster miniMRYarnCluster = null;
|
|
try {
|
|
try {
|
|
// This will test RPC to the NameNode only.
|
|
// This will test RPC to the NameNode only.
|
|
// could we test Client-DataNode connections?
|
|
// could we test Client-DataNode connections?
|
|
Path filePath = new Path("/dir");
|
|
Path filePath = new Path("/dir");
|
|
|
|
|
|
- assertFalse(directDfs.exists(filePath));
|
|
|
|
- assertFalse(dfs.exists(filePath));
|
|
|
|
|
|
+ Assert.assertFalse(directDfs.exists(filePath));
|
|
|
|
+ Assert.assertFalse(dfs.exists(filePath));
|
|
|
|
|
|
directDfs.mkdirs(filePath);
|
|
directDfs.mkdirs(filePath);
|
|
- assertTrue(directDfs.exists(filePath));
|
|
|
|
- assertTrue(dfs.exists(filePath));
|
|
|
|
|
|
+ Assert.assertTrue(directDfs.exists(filePath));
|
|
|
|
+ Assert.assertTrue(dfs.exists(filePath));
|
|
|
|
|
|
- // This will test TPC to a JobTracker
|
|
|
|
|
|
+ // This will test RPC to a Resource Manager
|
|
fs = FileSystem.get(sconf);
|
|
fs = FileSystem.get(sconf);
|
|
- mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
|
|
|
|
- final int jobTrackerPort = mr.getJobTrackerPort();
|
|
|
|
-
|
|
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
|
|
|
|
+ miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
|
|
JobConf jconf = new JobConf(cconf);
|
|
JobConf jconf = new JobConf(cconf);
|
|
- jconf.set("mapred.job.tracker", String.format("localhost:%d",
|
|
|
|
- jobTrackerPort + 10));
|
|
|
|
- jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
|
|
|
|
|
|
+ jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
|
+ String rmAddress = jconf.get("yarn.resourcemanager.address");
|
|
|
|
+ String[] split = rmAddress.split(":");
|
|
|
|
+ jconf.set("yarn.resourcemanager.address", split[0] + ':'
|
|
|
|
+ + (Integer.parseInt(split[1]) + 10));
|
|
client = new JobClient(jconf);
|
|
client = new JobClient(jconf);
|
|
|
|
|
|
JobStatus[] jobs = client.jobsToComplete();
|
|
JobStatus[] jobs = client.jobsToComplete();
|
|
- assertTrue(jobs.length == 0);
|
|
|
|
|
|
+ Assert.assertTrue(jobs.length == 0);
|
|
|
|
|
|
} finally {
|
|
} finally {
|
|
- try {
|
|
|
|
- if (client != null)
|
|
|
|
- client.close();
|
|
|
|
- } catch (Exception ignored) {
|
|
|
|
- // nothing we can do
|
|
|
|
- ignored.printStackTrace();
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (dfs != null)
|
|
|
|
- dfs.close();
|
|
|
|
|
|
+ closeClient(client);
|
|
|
|
+ closeDfs(dfs);
|
|
|
|
+ closeDfs(directDfs);
|
|
|
|
+ stopMiniMRYarnCluster(miniMRYarnCluster);
|
|
|
|
+ shutdownDFSCluster(cluster);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- } catch (Exception ignored) {
|
|
|
|
- // nothing we can do
|
|
|
|
- ignored.printStackTrace();
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (directDfs != null)
|
|
|
|
- directDfs.close();
|
|
|
|
|
|
+ private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) {
|
|
|
|
+ MiniMRYarnCluster miniMRYarnCluster;
|
|
|
|
+ miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
|
|
|
|
+ miniMRYarnCluster.init(jobConf);
|
|
|
|
+ miniMRYarnCluster.start();
|
|
|
|
+ return miniMRYarnCluster;
|
|
|
|
+ }
|
|
|
|
|
|
- } catch (Exception ignored) {
|
|
|
|
- // nothing we can do
|
|
|
|
- ignored.printStackTrace();
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- if (cluster != null)
|
|
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ private Configuration getCustomSocketConfigs(final int nameNodePort) {
|
|
|
|
+ // Get another reference via network using a specific socket factory
|
|
|
|
+ Configuration cconf = new Configuration();
|
|
|
|
+ FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
|
|
|
|
+ nameNodePort + 10));
|
|
|
|
+ cconf.set("hadoop.rpc.socket.factory.class.default",
|
|
|
|
+ "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
+ cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
|
|
|
|
+ "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
+ cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
|
|
|
|
+ "org.apache.hadoop.ipc.DummySocketFactory");
|
|
|
|
+ return cconf;
|
|
|
|
+ }
|
|
|
|
|
|
- } catch (Exception ignored) {
|
|
|
|
- // nothing we can do
|
|
|
|
- ignored.printStackTrace();
|
|
|
|
- }
|
|
|
|
- if (mr != null) {
|
|
|
|
- try {
|
|
|
|
- mr.shutdown();
|
|
|
|
- } catch (Exception ignored) {
|
|
|
|
- ignored.printStackTrace();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ private void shutdownDFSCluster(MiniDFSCluster cluster) {
|
|
|
|
+ try {
|
|
|
|
+ if (cluster != null)
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+
|
|
|
|
+ } catch (Exception ignored) {
|
|
|
|
+ // nothing we can do
|
|
|
|
+ ignored.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
|
|
|
|
+ try {
|
|
|
|
+ if (miniMRYarnCluster != null)
|
|
|
|
+ miniMRYarnCluster.stop();
|
|
|
|
+
|
|
|
|
+ } catch (Exception ignored) {
|
|
|
|
+ // nothing we can do
|
|
|
|
+ ignored.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void closeDfs(DistributedFileSystem dfs) {
|
|
|
|
+ try {
|
|
|
|
+ if (dfs != null)
|
|
|
|
+ dfs.close();
|
|
|
|
+
|
|
|
|
+ } catch (Exception ignored) {
|
|
|
|
+ // nothing we can do
|
|
|
|
+ ignored.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void closeClient(JobClient client) {
|
|
|
|
+ try {
|
|
|
|
+ if (client != null)
|
|
|
|
+ client.close();
|
|
|
|
+ } catch (Exception ignored) {
|
|
|
|
+ // nothing we can do
|
|
|
|
+ ignored.printStackTrace();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -155,32 +180,27 @@ class DummySocketFactory extends StandardSocketFactory {
|
|
public DummySocketFactory() {
|
|
public DummySocketFactory() {
|
|
}
|
|
}
|
|
|
|
|
|
- /* @inheritDoc */
|
|
|
|
@Override
|
|
@Override
|
|
public Socket createSocket() throws IOException {
|
|
public Socket createSocket() throws IOException {
|
|
return new Socket() {
|
|
return new Socket() {
|
|
@Override
|
|
@Override
|
|
- public void connect(SocketAddress addr, int timeout)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ public void connect(SocketAddress addr, int timeout) throws IOException {
|
|
|
|
|
|
assert (addr instanceof InetSocketAddress);
|
|
assert (addr instanceof InetSocketAddress);
|
|
InetSocketAddress iaddr = (InetSocketAddress) addr;
|
|
InetSocketAddress iaddr = (InetSocketAddress) addr;
|
|
SocketAddress newAddr = null;
|
|
SocketAddress newAddr = null;
|
|
if (iaddr.isUnresolved())
|
|
if (iaddr.isUnresolved())
|
|
- newAddr =
|
|
|
|
- new InetSocketAddress(iaddr.getHostName(),
|
|
|
|
- iaddr.getPort() - 10);
|
|
|
|
|
|
+ newAddr = new InetSocketAddress(iaddr.getHostName(),
|
|
|
|
+ iaddr.getPort() - 10);
|
|
else
|
|
else
|
|
- newAddr =
|
|
|
|
- new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
|
|
|
|
- System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
|
|
|
|
- newAddr);
|
|
|
|
|
|
+ newAddr = new InetSocketAddress(iaddr.getAddress(),
|
|
|
|
+ iaddr.getPort() - 10);
|
|
|
|
+ System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr);
|
|
super.connect(newAddr, timeout);
|
|
super.connect(newAddr, timeout);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
- /* @inheritDoc */
|
|
|
|
@Override
|
|
@Override
|
|
public boolean equals(Object obj) {
|
|
public boolean equals(Object obj) {
|
|
if (this == obj)
|
|
if (this == obj)
|
|
@@ -191,11 +211,4 @@ class DummySocketFactory extends StandardSocketFactory {
|
|
return false;
|
|
return false;
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
-
|
|
|
|
- /* @inheritDoc */
|
|
|
|
- @Override
|
|
|
|
- public int hashCode() {
|
|
|
|
- // Dummy hash code (to make find bugs happy)
|
|
|
|
- return 53;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|