|
@@ -47,7 +47,6 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
|
|
|
|
static final Log LOG = LogFactory.getLog(ClusterWithCapacityScheduler.class);
|
|
static final Log LOG = LogFactory.getLog(ClusterWithCapacityScheduler.class);
|
|
private MiniMRCluster mrCluster;
|
|
private MiniMRCluster mrCluster;
|
|
- private MiniDFSCluster dfsCluster;
|
|
|
|
|
|
|
|
private JobConf jobConf;
|
|
private JobConf jobConf;
|
|
|
|
|
|
@@ -71,7 +70,7 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
protected void startCluster(Properties clusterProperties,
|
|
protected void startCluster(Properties clusterProperties,
|
|
Properties schedulerProperties)
|
|
Properties schedulerProperties)
|
|
throws IOException {
|
|
throws IOException {
|
|
- startCluster(2, 2, clusterProperties, schedulerProperties);
|
|
|
|
|
|
+ startCluster(2, clusterProperties, schedulerProperties);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -81,12 +80,11 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
* user provided properties are missing (null/empty)
|
|
* user provided properties are missing (null/empty)
|
|
*
|
|
*
|
|
* @param numTaskTrackers
|
|
* @param numTaskTrackers
|
|
- * @param numDataNodes
|
|
|
|
* @param clusterProperties
|
|
* @param clusterProperties
|
|
* @param schedulerProperties
|
|
* @param schedulerProperties
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- protected void startCluster(int numTaskTrackers, int numDataNodes,
|
|
|
|
|
|
+ protected void startCluster(int numTaskTrackers,
|
|
Properties clusterProperties, Properties schedulerProperties)
|
|
Properties clusterProperties, Properties schedulerProperties)
|
|
throws IOException {
|
|
throws IOException {
|
|
Thread.currentThread().setContextClassLoader(
|
|
Thread.currentThread().setContextClassLoader(
|
|
@@ -99,7 +97,6 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
clusterConf.set(key, (String) clusterProperties.get(key));
|
|
clusterConf.set(key, (String) clusterProperties.get(key));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- dfsCluster = new MiniDFSCluster(clusterConf, numDataNodes, true, null);
|
|
|
|
|
|
|
|
if (schedulerProperties != null) {
|
|
if (schedulerProperties != null) {
|
|
setUpSchedulerConfigFile(schedulerProperties);
|
|
setUpSchedulerConfigFile(schedulerProperties);
|
|
@@ -108,23 +105,22 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
clusterConf.set("mapred.jobtracker.taskScheduler",
|
|
clusterConf.set("mapred.jobtracker.taskScheduler",
|
|
CapacityTaskScheduler.class.getName());
|
|
CapacityTaskScheduler.class.getName());
|
|
mrCluster =
|
|
mrCluster =
|
|
- new MiniMRCluster(numTaskTrackers, dfsCluster.getFileSystem().getUri()
|
|
|
|
- .toString(), 1, null, null, clusterConf);
|
|
|
|
|
|
+ new MiniMRCluster(numTaskTrackers, "file:///", 1, null, null,
|
|
|
|
+ clusterConf);
|
|
|
|
|
|
this.jobConf = mrCluster.createJobConf(clusterConf);
|
|
this.jobConf = mrCluster.createJobConf(clusterConf);
|
|
}
|
|
}
|
|
|
|
|
|
private void setUpSchedulerConfigFile(Properties schedulerConfProps)
|
|
private void setUpSchedulerConfigFile(Properties schedulerConfProps)
|
|
throws IOException {
|
|
throws IOException {
|
|
- Configuration config = new Configuration(false);
|
|
|
|
-
|
|
|
|
- LocalFileSystem fs = FileSystem.getLocal(config);
|
|
|
|
|
|
+ LocalFileSystem fs = FileSystem.getLocal(new Configuration());
|
|
|
|
|
|
String myResourcePath = System.getProperty("test.build.data");
|
|
String myResourcePath = System.getProperty("test.build.data");
|
|
Path schedulerConfigFilePath =
|
|
Path schedulerConfigFilePath =
|
|
new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
|
|
new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
|
|
OutputStream out = fs.create(schedulerConfigFilePath);
|
|
OutputStream out = fs.create(schedulerConfigFilePath);
|
|
|
|
|
|
|
|
+ Configuration config = new Configuration(false);
|
|
for (Enumeration<?> e = schedulerConfProps.propertyNames(); e
|
|
for (Enumeration<?> e = schedulerConfProps.propertyNames(); e
|
|
.hasMoreElements();) {
|
|
.hasMoreElements();) {
|
|
String key = (String) e.nextElement();
|
|
String key = (String) e.nextElement();
|
|
@@ -141,9 +137,7 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
private void cleanUpSchedulerConfigFile() throws IOException {
|
|
private void cleanUpSchedulerConfigFile() throws IOException {
|
|
- Configuration config = new Configuration(false);
|
|
|
|
-
|
|
|
|
- LocalFileSystem fs = FileSystem.getLocal(config);
|
|
|
|
|
|
+ LocalFileSystem fs = FileSystem.getLocal(new Configuration());
|
|
|
|
|
|
String myResourcePath = System.getProperty("test.build.data");
|
|
String myResourcePath = System.getProperty("test.build.data");
|
|
Path schedulerConfigFilePath =
|
|
Path schedulerConfigFilePath =
|
|
@@ -167,9 +161,6 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
if (mrCluster != null) {
|
|
if (mrCluster != null) {
|
|
mrCluster.shutdown();
|
|
mrCluster.shutdown();
|
|
}
|
|
}
|
|
- if (dfsCluster != null) {
|
|
|
|
- dfsCluster.shutdown();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -232,4 +223,4 @@ public class ClusterWithCapacityScheduler extends TestCase {
|
|
return super.findResource(name);
|
|
return super.findResource(name);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-}
|
|
|
|
|
|
+}
|