|
@@ -65,7 +65,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
FileSystem fs = null;
|
|
|
static final String SUBDIR = "taskTracker";
|
|
|
|
|
|
- private Configuration fConf;
|
|
|
+ private JobConf fConf;
|
|
|
private MapOutputFile mapOutputFile;
|
|
|
|
|
|
private int maxCurrentTasks;
|
|
@@ -83,14 +83,14 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
/**
|
|
|
* Start with the local machine name, and the default JobTracker
|
|
|
*/
|
|
|
- public TaskTracker(Configuration conf) throws IOException {
|
|
|
+ public TaskTracker(JobConf conf) throws IOException {
|
|
|
this(JobTracker.getAddress(conf), conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Start with the local machine name, and the addr of the target JobTracker
|
|
|
*/
|
|
|
- public TaskTracker(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
|
|
|
+ public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf) throws IOException {
|
|
|
maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
|
|
|
|
|
|
this.fConf = conf;
|
|
@@ -112,7 +112,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
(Math.abs(r.nextInt()) % 100000);
|
|
|
LOG.info("Starting tracker " + taskTrackerName);
|
|
|
|
|
|
- new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
|
|
|
+ fConf.deleteLocalFiles(SUBDIR);
|
|
|
|
|
|
// Clear out state tables
|
|
|
this.tasks = new TreeMap();
|
|
@@ -409,17 +409,19 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
TaskRunner runner;
|
|
|
boolean done = false;
|
|
|
boolean wasKilled = false;
|
|
|
- private JobConf jobConf;
|
|
|
+ private JobConf defaultJobConf;
|
|
|
+ private JobConf localJobConf;
|
|
|
|
|
|
/**
|
|
|
*/
|
|
|
- public TaskInProgress(Task task, Configuration conf) {
|
|
|
+ public TaskInProgress(Task task, JobConf conf) {
|
|
|
this.task = task;
|
|
|
this.progress = 0.0f;
|
|
|
this.runstate = TaskStatus.UNASSIGNED;
|
|
|
stateString = "initializing";
|
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
|
- this.jobConf = new JobConf(conf);
|
|
|
+ this.defaultJobConf = conf;
|
|
|
+ localJobConf = null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -427,31 +429,35 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* So here, edit the Task's fields appropriately.
|
|
|
*/
|
|
|
private void localizeTask(Task t) throws IOException {
|
|
|
- this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" +
|
|
|
+ task.getTaskId());
|
|
|
Path localJobFile =
|
|
|
- this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
|
|
|
+ this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
|
|
|
Path localJarFile =
|
|
|
- this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
|
|
|
+ this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
|
|
|
|
|
|
String jobFile = t.getJobFile();
|
|
|
fs.copyToLocalFile(new Path(jobFile), localJobFile);
|
|
|
t.setJobFile(localJobFile.toString());
|
|
|
|
|
|
- JobConf jc = new JobConf(localJobFile);
|
|
|
- jc.set("mapred.task.id", task.getTaskId());
|
|
|
- String jarFile = jc.getJar();
|
|
|
+ localJobConf = new JobConf(localJobFile);
|
|
|
+ localJobConf.set("mapred.task.id", task.getTaskId());
|
|
|
+ String jarFile = localJobConf.getJar();
|
|
|
if (jarFile != null) {
|
|
|
fs.copyToLocalFile(new Path(jarFile), localJarFile);
|
|
|
- jc.setJar(localJarFile.toString());
|
|
|
+ localJobConf.setJar(localJarFile.toString());
|
|
|
|
|
|
FileSystem localFs = FileSystem.getNamed("local", fConf);
|
|
|
OutputStream out = localFs.create(localJobFile);
|
|
|
try {
|
|
|
- jc.write(out);
|
|
|
+ localJobConf.write(out);
|
|
|
} finally {
|
|
|
out.close();
|
|
|
}
|
|
|
}
|
|
|
+ // set the task's configuration to the local job conf
|
|
|
+ // rather than the default.
|
|
|
+ t.setConf(localJobConf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -618,7 +624,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
runner.close();
|
|
|
} catch (IOException ie) {
|
|
|
}
|
|
|
- this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -735,17 +741,18 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
LogFormatter.showTime(false);
|
|
|
LOG.info("Child starting");
|
|
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
+ JobConf defaultConf = new JobConf();
|
|
|
int port = Integer.parseInt(args[0]);
|
|
|
String taskid = args[1];
|
|
|
TaskUmbilicalProtocol umbilical =
|
|
|
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
|
|
|
- new InetSocketAddress(port), conf);
|
|
|
+ new InetSocketAddress(port),
|
|
|
+ defaultConf);
|
|
|
|
|
|
Task task = umbilical.getTask(taskid);
|
|
|
JobConf job = new JobConf(task.getJobFile());
|
|
|
|
|
|
- conf.addFinalResource(new Path(task.getJobFile()));
|
|
|
+ defaultConf.addFinalResource(new Path(task.getJobFile()));
|
|
|
|
|
|
startPinging(umbilical, taskid); // start pinging parent
|
|
|
|
|
@@ -809,7 +816,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
|
|
|
- TaskTracker tt = new TaskTracker(new Configuration());
|
|
|
+ TaskTracker tt = new TaskTracker(new JobConf());
|
|
|
tt.run();
|
|
|
}
|
|
|
}
|