|
@@ -15,9 +15,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
|
|
|
import java.io.*;
|
|
@@ -27,8 +25,6 @@ import java.util.logging.*;
|
|
|
|
|
|
/** Runs a reduce task. */
|
|
|
class ReduceTaskRunner extends TaskRunner {
|
|
|
- private static final Logger LOG =
|
|
|
- LogFormatter.getLogger("org.apache.hadoop.mapred.ReduceTaskRunner");
|
|
|
private MapOutputFile mapOutputFile;
|
|
|
|
|
|
public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
|
|
@@ -41,13 +37,13 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
public boolean prepare() throws IOException {
|
|
|
ReduceTask task = ((ReduceTask)getTask());
|
|
|
this.mapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
|
|
|
- String[][] mapTaskIds = task.getMapTaskIds();
|
|
|
+ int numMaps = task.getNumMaps();
|
|
|
final Progress copyPhase = getTask().getProgress().phase();
|
|
|
|
|
|
// we need input from every map task
|
|
|
- Vector needed = new Vector();
|
|
|
- for (int i = 0; i < mapTaskIds.length; i++) {
|
|
|
- needed.add(mapTaskIds[i]);
|
|
|
+ List needed = new ArrayList(numMaps);
|
|
|
+ for (int i = 0; i < numMaps; i++) {
|
|
|
+ needed.add(new Integer(i));
|
|
|
copyPhase.addPhase(); // add sub-phase per file
|
|
|
}
|
|
|
|
|
@@ -59,15 +55,17 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
// query for a just a random subset of needed segments so that we don't
|
|
|
// overwhelm jobtracker. ideally perhaps we could send a more compact
|
|
|
// representation of all needed, i.e., a bit-vector
|
|
|
- Collections.shuffle(needed);
|
|
|
int checkSize = Math.min(10, needed.size());
|
|
|
- String[][] neededStrings = new String[checkSize][];
|
|
|
+ int[] neededIds = new int[checkSize];
|
|
|
+ Collections.shuffle(needed);
|
|
|
+ ListIterator itr = needed.listIterator();
|
|
|
for (int i = 0; i < checkSize; i++) {
|
|
|
- neededStrings[i] = (String[]) needed.elementAt(i);
|
|
|
+ neededIds[i] = ((Integer) itr.next()).intValue();
|
|
|
}
|
|
|
MapOutputLocation[] locs = null;
|
|
|
try {
|
|
|
- locs = jobClient.locateMapOutputs(task.getTaskId(), neededStrings);
|
|
|
+ locs = jobClient.locateMapOutputs(task.getJobId().toString(),
|
|
|
+ neededIds, task.getPartition());
|
|
|
} catch (IOException ie) {
|
|
|
LOG.info("Problem locating map outputs: " +
|
|
|
StringUtils.stringifyException(ie));
|
|
@@ -112,18 +110,15 @@ class ReduceTaskRunner extends TaskRunner {
|
|
|
LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId()
|
|
|
+" output from "+loc.getHost()+".");
|
|
|
client.getFile(loc.getMapTaskId(), task.getTaskId(),
|
|
|
- new IntWritable(task.getPartition()));
|
|
|
+ loc.getMapId(),
|
|
|
+ task.getPartition());
|
|
|
|
|
|
// Success: remove from 'needed'
|
|
|
- boolean foundit = false;
|
|
|
- for (Iterator it = needed.iterator(); it.hasNext() && !foundit; ) {
|
|
|
- String idsForSingleMap[] = (String[]) it.next();
|
|
|
- for (int j = 0; j < idsForSingleMap.length; j++) {
|
|
|
- if (idsForSingleMap[j].equals(loc.getMapTaskId())) {
|
|
|
- it.remove();
|
|
|
- foundit = true;
|
|
|
- break;
|
|
|
- }
|
|
|
+ for (Iterator it = needed.iterator(); it.hasNext(); ) {
|
|
|
+ int mapId = ((Integer) it.next()).intValue();
|
|
|
+ if (mapId == loc.getMapId()) {
|
|
|
+ it.remove();
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
copyPhase.startNextPhase();
|