|
@@ -44,7 +44,7 @@ class JobInProgress {
|
|
|
int numReduceTasks = 0;
|
|
|
|
|
|
JobTracker jobtracker = null;
|
|
|
- TreeMap cachedHints = new TreeMap();
|
|
|
+ HashMap hostToMaps = new HashMap();
|
|
|
|
|
|
long startTime;
|
|
|
long finishTime;
|
|
@@ -159,29 +159,28 @@ class JobInProgress {
|
|
|
// Obtain some tasktracker-cache information for the map task splits.
|
|
|
//
|
|
|
for (int i = 0; i < maps.length; i++) {
|
|
|
- String hints[][] = fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(), splits[i].getLength());
|
|
|
- cachedHints.put(maps[i].getTIPId(), hints);
|
|
|
+ String hints[][] =
|
|
|
+ fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
|
|
|
+ splits[i].getLength());
|
|
|
+
|
|
|
+ if (hints != null) {
|
|
|
+ for (int k = 0; k < hints.length; k++) {
|
|
|
+ for (int j = 0; j < hints[k].length; j++) {
|
|
|
+ ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);
|
|
|
+ if (hostMaps == null) {
|
|
|
+ hostMaps = new ArrayList();
|
|
|
+ hostToMaps.put(hints[k][j], hostMaps);
|
|
|
+ }
|
|
|
+ hostMaps.add(maps[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
|
|
|
tasksInited = true;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This is called by TaskInProgress objects. The JobInProgress
|
|
|
- * prefetches and caches a lot of these hints. If the hint is
|
|
|
- * not available, then we pass it through to the filesystem.
|
|
|
- */
|
|
|
- String[][] getFileCacheHints(String tipID, Path f, long start, long len) throws IOException {
|
|
|
- String results[][] = (String[][]) cachedHints.get(tipID);
|
|
|
- if (tipID == null) {
|
|
|
- FileSystem fs = FileSystem.get(conf);
|
|
|
- results = fs.getFileCacheHints(f, start, len);
|
|
|
- cachedHints.put(tipID, results);
|
|
|
- }
|
|
|
- return results;
|
|
|
- }
|
|
|
-
|
|
|
/////////////////////////////////////////////////////
|
|
|
// Accessors for the JobInProgress
|
|
|
/////////////////////////////////////////////////////
|
|
@@ -301,20 +300,18 @@ class JobInProgress {
|
|
|
// the TaskTracker checking in. That means the block
|
|
|
// doesn't have to be transmitted from another node.
|
|
|
//
|
|
|
- for (int i = 0; i < maps.length; i++) {
|
|
|
- int realIdx = (i + firstMapToTry) % maps.length;
|
|
|
- if (maps[realIdx].hasTaskWithCacheHit(taskTracker, tts)) {
|
|
|
- if (cacheTarget < 0) {
|
|
|
- if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
|
|
|
- if (failedTarget < 0) {
|
|
|
- failedTarget = realIdx;
|
|
|
- }
|
|
|
- } else {
|
|
|
- cacheTarget = realIdx;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost());
|
|
|
+ if (hostMaps != null) {
|
|
|
+ Iterator i = hostMaps.iterator();
|
|
|
+ while (i.hasNext()) {
|
|
|
+ TaskInProgress tip = (TaskInProgress)i.next();
|
|
|
+ if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) {
|
|
|
+ LOG.info("Found task with local split for "+tts.getHost());
|
|
|
+ cacheTarget = tip.getIdWithinJob();
|
|
|
+ i.remove();
|
|
|
+ break;
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//
|