|
@@ -25,11 +25,13 @@ import java.io.PrintWriter;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.security.Groups;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
@@ -53,6 +55,7 @@ import junit.framework.TestCase;
|
|
|
public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(ClusterWithLinuxTaskController.class);
|
|
|
+ static String TT_GROUP = "mapreduce.tasktracker.group";
|
|
|
|
|
|
/**
|
|
|
* The wrapper class around LinuxTaskController which allows modification of
|
|
@@ -60,7 +63,22 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
*
|
|
|
**/
|
|
|
public static class MyLinuxTaskController extends LinuxTaskController {
|
|
|
- String taskControllerExePath;
|
|
|
+ String taskControllerExePath = System.getProperty(TASKCONTROLLER_PATH)
|
|
|
+ + "/task-controller";
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setup() throws IOException {
|
|
|
+ // get the current ugi and set the task controller group owner
|
|
|
+ Groups groups = new Groups(new Configuration());
|
|
|
+ String ttGroup = groups.getGroups(
|
|
|
+ UserGroupInformation.getCurrentUser().getUserName()).get(0);
|
|
|
+ getConf().set(TT_GROUP, ttGroup);
|
|
|
+
|
|
|
+ // write configuration file
|
|
|
+ configurationFile = createTaskControllerConf(System
|
|
|
+ .getProperty(TASKCONTROLLER_PATH), getConf());
|
|
|
+ super.setup();
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
protected String getTaskControllerExecutablePath() {
|
|
@@ -79,12 +97,17 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
private JobConf clusterConf = null;
|
|
|
protected Path homeDirectory;
|
|
|
|
|
|
+ /** changing this to a larger number needs more work for creating
|
|
|
+ * taskcontroller.cfg.
|
|
|
+ * see {@link #startCluster()} and
|
|
|
+ * {@link #createTaskControllerConf(String, Configuration)}
|
|
|
+ */
|
|
|
private static final int NUMBER_OF_NODES = 1;
|
|
|
|
|
|
static final String TASKCONTROLLER_PATH = "taskcontroller-path";
|
|
|
static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
|
|
|
|
|
|
- private File configurationFile = null;
|
|
|
+ private static File configurationFile = null;
|
|
|
|
|
|
protected UserGroupInformation taskControllerUser;
|
|
|
|
|
@@ -102,17 +125,6 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
|
|
|
.toString(), 4, null, null, conf);
|
|
|
|
|
|
- // Get the configured taskcontroller-path
|
|
|
- String path = System.getProperty(TASKCONTROLLER_PATH);
|
|
|
- configurationFile =
|
|
|
- createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
|
|
|
- .getLocalDirs());
|
|
|
- String execPath = path + "/task-controller";
|
|
|
- TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
|
|
|
- // TypeCasting the parent to our TaskController instance as we
|
|
|
- // know that that would be instance which should be present in TT.
|
|
|
- ((MyLinuxTaskController) tracker.getTaskController())
|
|
|
- .setTaskControllerExe(execPath);
|
|
|
String ugi = System.getProperty(TASKCONTROLLER_UGI);
|
|
|
clusterConf = mrCluster.createJobConf();
|
|
|
String[] splits = ugi.split(",");
|
|
@@ -143,16 +155,21 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
taskControllerUser.getGroupNames()[0]);
|
|
|
}
|
|
|
|
|
|
+ static File getTaskControllerConfFile(String path) {
|
|
|
+ File confDirectory = new File(path, "../conf");
|
|
|
+ return new File(confDirectory, "taskcontroller.cfg");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Create taskcontroller.cfg.
|
|
|
*
|
|
|
* @param path Path to the taskcontroller binary.
|
|
|
- * @param localDirs
|
|
|
+ * @param conf TaskTracker's configuration
|
|
|
* @return the created conf file
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- static File createTaskControllerConf(String path, String[] localDirs)
|
|
|
- throws IOException {
|
|
|
+ static File createTaskControllerConf(String path,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
File confDirectory = new File(path, "../conf");
|
|
|
if (!confDirectory.exists()) {
|
|
|
confDirectory.mkdirs();
|
|
@@ -161,17 +178,12 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
PrintWriter writer =
|
|
|
new PrintWriter(new FileOutputStream(configurationFile));
|
|
|
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
- for (int i = 0; i < localDirs.length; i++) {
|
|
|
- sb.append(localDirs[i]);
|
|
|
- if ((i + 1) != localDirs.length) {
|
|
|
- sb.append(",");
|
|
|
- }
|
|
|
- }
|
|
|
- writer.println(String.format("mapred.local.dir=%s", sb.toString()));
|
|
|
+ writer.println(String.format("mapred.local.dir=%s", conf.
|
|
|
+ get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
|
|
|
|
|
|
writer
|
|
|
.println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
|
|
|
+ writer.println(String.format(TT_GROUP + "=%s", conf.get(TT_GROUP)));
|
|
|
|
|
|
writer.flush();
|
|
|
writer.close();
|
|
@@ -191,7 +203,7 @@ public class ClusterWithLinuxTaskController extends TestCase {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- private static boolean isTaskExecPathPassed() {
|
|
|
+ static boolean isTaskExecPathPassed() {
|
|
|
String path = System.getProperty(TASKCONTROLLER_PATH);
|
|
|
if (path == null || path.isEmpty()
|
|
|
|| path.equals("${" + TASKCONTROLLER_PATH + "}")) {
|