|
@@ -134,38 +134,46 @@ class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
map_tasks -= 1;
|
|
|
updateCounters(map);
|
|
|
}
|
|
|
- if (numReduceTasks > 0) {
|
|
|
- // move map output to reduce input
|
|
|
- String reduceId = "reduce_" + newId();
|
|
|
+ String reduceId = "reduce_" + newId();
|
|
|
+ try {
|
|
|
+ if (numReduceTasks > 0) {
|
|
|
+ // move map output to reduce input
|
|
|
+ for (int i = 0; i < mapIds.size(); i++) {
|
|
|
+ String mapId = mapIds.get(i);
|
|
|
+ Path mapOut = this.mapoutputFile.getOutputFile(mapId);
|
|
|
+ Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
|
|
|
+ localFs.getLength(mapOut));
|
|
|
+ if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + reduceIn.getParent().toString());
|
|
|
+ }
|
|
|
+ if (!localFs.rename(mapOut, reduceIn))
|
|
|
+ throw new IOException("Couldn't rename " + mapOut);
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
|
|
|
+ reduceId, 0, mapIds.size());
|
|
|
+ JobConf localConf = new JobConf(job);
|
|
|
+ reduce.localizeConfiguration(localConf);
|
|
|
+ reduce.setConf(localConf);
|
|
|
+ reduce_tasks += 1;
|
|
|
+ myMetrics.launchReduce();
|
|
|
+ reduce.run(localConf, this);
|
|
|
+ reduce.saveTaskOutput();
|
|
|
+ myMetrics.completeReduce();
|
|
|
+ reduce_tasks -= 1;
|
|
|
+ updateCounters(reduce);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
for (int i = 0; i < mapIds.size(); i++) {
|
|
|
String mapId = mapIds.get(i);
|
|
|
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
|
|
|
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
|
|
|
- localFs.getLength(mapOut));
|
|
|
- if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + reduceIn.getParent().toString());
|
|
|
- }
|
|
|
- if (!localFs.rename(mapOut, reduceIn))
|
|
|
- throw new IOException("Couldn't rename " + mapOut);
|
|
|
this.mapoutputFile.removeAll(mapId);
|
|
|
}
|
|
|
-
|
|
|
- {
|
|
|
- ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
|
|
|
- reduceId, 0, mapIds.size());
|
|
|
- JobConf localConf = new JobConf(job);
|
|
|
- reduce.localizeConfiguration(localConf);
|
|
|
- reduce.setConf(localConf);
|
|
|
- reduce_tasks += 1;
|
|
|
- myMetrics.launchReduce();
|
|
|
- reduce.run(localConf, this);
|
|
|
- reduce.saveTaskOutput();
|
|
|
- myMetrics.completeReduce();
|
|
|
- reduce_tasks -= 1;
|
|
|
- updateCounters(reduce);
|
|
|
+ if (numReduceTasks == 1) {
|
|
|
+ this.mapoutputFile.removeAll(reduceId);
|
|
|
}
|
|
|
- this.mapoutputFile.removeAll(reduceId);
|
|
|
}
|
|
|
this.status.setRunState(JobStatus.SUCCEEDED);
|
|
|
|