|
@@ -20,6 +20,7 @@ import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.LogFormatter;
|
|
|
|
|
|
import java.io.*;
|
|
|
+import java.text.NumberFormat;
|
|
|
import java.util.*;
|
|
|
import java.util.logging.*;
|
|
|
|
|
@@ -43,6 +44,11 @@ class TaskInProgress {
|
|
|
static final int MAX_TASK_FAILURES = 4;
|
|
|
static final double SPECULATIVE_GAP = 0.2;
|
|
|
static final long SPECULATIVE_LAG = 60 * 1000;
|
|
|
+ private static NumberFormat idFormat = NumberFormat.getInstance();
|
|
|
+ static {
|
|
|
+ idFormat.setMinimumIntegerDigits(6);
|
|
|
+ idFormat.setGroupingUsed(false);
|
|
|
+ }
|
|
|
|
|
|
public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress");
|
|
|
|
|
@@ -77,7 +83,7 @@ class TaskInProgress {
|
|
|
/**
|
|
|
* Constructor for MapTask
|
|
|
*/
|
|
|
- public TaskInProgress(String jobFile, FileSplit split,
|
|
|
+ public TaskInProgress(String uniqueString, String jobFile, FileSplit split,
|
|
|
JobTracker jobtracker, JobConf conf,
|
|
|
JobInProgress job, int partition) {
|
|
|
this.jobFile = jobFile;
|
|
@@ -86,13 +92,14 @@ class TaskInProgress {
|
|
|
this.job = job;
|
|
|
this.conf = conf;
|
|
|
this.partition = partition;
|
|
|
- init();
|
|
|
+ init(uniqueString);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Constructor for ReduceTask
|
|
|
*/
|
|
|
- public TaskInProgress(String jobFile, TaskInProgress predecessors[],
|
|
|
+ public TaskInProgress(String uniqueString, String jobFile,
|
|
|
+ TaskInProgress predecessors[],
|
|
|
int partition, JobTracker jobtracker, JobConf conf,
|
|
|
JobInProgress job) {
|
|
|
this.jobFile = jobFile;
|
|
@@ -101,23 +108,37 @@ class TaskInProgress {
|
|
|
this.jobtracker = jobtracker;
|
|
|
this.job = job;
|
|
|
this.conf = conf;
|
|
|
- init();
|
|
|
+ init(uniqueString);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Make a unique name for this TIP.
|
|
|
+ * @param uniqueBase The unique name of the job
|
|
|
+ * @return The unique string for this tip
|
|
|
+ */
|
|
|
+ private String makeUniqueString(String uniqueBase) {
|
|
|
+ StringBuffer result = new StringBuffer();
|
|
|
+ result.append(uniqueBase);
|
|
|
+ if (isMapTask()) {
|
|
|
+ result.append("_m_");
|
|
|
+ } else {
|
|
|
+ result.append("_r_");
|
|
|
+ }
|
|
|
+ result.append(idFormat.format(partition));
|
|
|
+ return result.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Initialization common to Map and Reduce
|
|
|
*/
|
|
|
- void init() {
|
|
|
+ void init(String jobUniqueString) {
|
|
|
this.startTime = System.currentTimeMillis();
|
|
|
- this.id = "tip_" + jobtracker.createUniqueId();
|
|
|
+ String uniqueString = makeUniqueString(jobUniqueString);
|
|
|
+ this.id = "tip_" + uniqueString;
|
|
|
this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
|
|
|
for (int i = 0; i < totalTaskIds.length; i++) {
|
|
|
- if (isMapTask()) {
|
|
|
- totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId();
|
|
|
- } else {
|
|
|
- totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId();
|
|
|
- }
|
|
|
- usableTaskIds.add(totalTaskIds[i]);
|
|
|
+ totalTaskIds[i] = "task_" + uniqueString + "_" + i;
|
|
|
+ usableTaskIds.add(totalTaskIds[i]);
|
|
|
}
|
|
|
}
|
|
|
|