|
@@ -107,6 +107,8 @@ class ReduceTask extends Task {
|
|
|
private Progress copyPhase;
|
|
|
private Progress sortPhase;
|
|
|
private Progress reducePhase;
|
|
|
+ private Counters.Counter reduceInputBytes =
|
|
|
+ getCounters().findCounter(Counter.REDUCE_INPUT_BYTES);
|
|
|
private Counters.Counter reduceInputKeyCounter =
|
|
|
getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
|
|
|
private Counters.Counter reduceInputValueCounter =
|
|
@@ -372,6 +374,7 @@ class ReduceTask extends Task {
|
|
|
throw new IOException("Task: " + getTaskID() +
|
|
|
" - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
|
}
|
|
|
+ reduceInputBytes.increment(reduceCopier.reducerInputBytes);
|
|
|
}
|
|
|
copyPhase.complete(); // copy is already complete
|
|
|
setPhase(TaskStatus.Phase.SORT);
|
|
@@ -1624,6 +1627,8 @@ class ReduceTask extends Task {
|
|
|
return numInFlight > maxInFlight;
|
|
|
}
|
|
|
|
|
|
+ long reducerInputBytes = 0;
|
|
|
+
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
|
//The map for (Hosts, List of MapIds from this Host)
|
|
|
HashMap<String, List<MapOutputLocation>> mapLocations =
|
|
@@ -1852,6 +1857,7 @@ class ReduceTask extends Task {
|
|
|
numCopied++;
|
|
|
lastProgressTime = System.currentTimeMillis();
|
|
|
bytesTransferred += cr.getSize();
|
|
|
+ reducerInputBytes += cr.getSize();
|
|
|
|
|
|
long secsSinceStart =
|
|
|
(System.currentTimeMillis()-startTime)/1000+1;
|