|
@@ -111,8 +111,8 @@ class ReduceTask extends Task {
|
|
private Progress copyPhase;
|
|
private Progress copyPhase;
|
|
private Progress sortPhase;
|
|
private Progress sortPhase;
|
|
private Progress reducePhase;
|
|
private Progress reducePhase;
|
|
- private Counters.Counter reduceInputBytes =
|
|
|
|
- getCounters().findCounter(Counter.REDUCE_INPUT_BYTES);
|
|
|
|
|
|
+ private Counters.Counter reduceShuffleBytes =
|
|
|
|
+ getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
|
|
private Counters.Counter reduceInputKeyCounter =
|
|
private Counters.Counter reduceInputKeyCounter =
|
|
getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
|
|
getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
|
|
private Counters.Counter reduceInputValueCounter =
|
|
private Counters.Counter reduceInputValueCounter =
|
|
@@ -380,7 +380,6 @@ class ReduceTask extends Task {
|
|
throw new IOException("Task: " + getTaskID() +
|
|
throw new IOException("Task: " + getTaskID() +
|
|
" - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
" - The reduce copier failed", reduceCopier.mergeThrowable);
|
|
}
|
|
}
|
|
- reduceInputBytes.increment(reduceCopier.reducerInputBytes);
|
|
|
|
}
|
|
}
|
|
copyPhase.complete(); // copy is already complete
|
|
copyPhase.complete(); // copy is already complete
|
|
setPhase(TaskStatus.Phase.SORT);
|
|
setPhase(TaskStatus.Phase.SORT);
|
|
@@ -919,7 +918,7 @@ class ReduceTask extends Task {
|
|
|
|
|
|
byte[] data;
|
|
byte[] data;
|
|
final boolean inMemory;
|
|
final boolean inMemory;
|
|
- long size;
|
|
|
|
|
|
+ long compressedSize;
|
|
|
|
|
|
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
|
|
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
|
|
Configuration conf, Path file, long size) {
|
|
Configuration conf, Path file, long size) {
|
|
@@ -928,14 +927,14 @@ class ReduceTask extends Task {
|
|
|
|
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
this.file = file;
|
|
this.file = file;
|
|
- this.size = size;
|
|
|
|
|
|
+ this.compressedSize = size;
|
|
|
|
|
|
this.data = null;
|
|
this.data = null;
|
|
|
|
|
|
this.inMemory = false;
|
|
this.inMemory = false;
|
|
}
|
|
}
|
|
|
|
|
|
- public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data) {
|
|
|
|
|
|
+ public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength) {
|
|
this.mapId = mapId;
|
|
this.mapId = mapId;
|
|
this.mapAttemptId = mapAttemptId;
|
|
this.mapAttemptId = mapAttemptId;
|
|
|
|
|
|
@@ -943,7 +942,7 @@ class ReduceTask extends Task {
|
|
this.conf = null;
|
|
this.conf = null;
|
|
|
|
|
|
this.data = data;
|
|
this.data = data;
|
|
- this.size = data.length;
|
|
|
|
|
|
+ this.compressedSize = compressedLength;
|
|
|
|
|
|
this.inMemory = true;
|
|
this.inMemory = true;
|
|
}
|
|
}
|
|
@@ -1262,7 +1261,7 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
|
|
|
|
// The size of the map-output
|
|
// The size of the map-output
|
|
- long bytes = mapOutput.size;
|
|
|
|
|
|
+ long bytes = mapOutput.compressedSize;
|
|
|
|
|
|
// lock the ReduceTask while we do the rename
|
|
// lock the ReduceTask while we do the rename
|
|
synchronized (ReduceTask.this) {
|
|
synchronized (ReduceTask.this) {
|
|
@@ -1467,7 +1466,7 @@ class ReduceTask extends Task {
|
|
byte[] shuffleData = new byte[mapOutputLength];
|
|
byte[] shuffleData = new byte[mapOutputLength];
|
|
MapOutput mapOutput =
|
|
MapOutput mapOutput =
|
|
new MapOutput(mapOutputLoc.getTaskId(),
|
|
new MapOutput(mapOutputLoc.getTaskId(),
|
|
- mapOutputLoc.getTaskAttemptId(), shuffleData);
|
|
|
|
|
|
+ mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
|
|
|
|
|
|
int bytesRead = 0;
|
|
int bytesRead = 0;
|
|
try {
|
|
try {
|
|
@@ -1741,12 +1740,10 @@ class ReduceTask extends Task {
|
|
return numInFlight > maxInFlight;
|
|
return numInFlight > maxInFlight;
|
|
}
|
|
}
|
|
|
|
|
|
- long reducerInputBytes = 0;
|
|
|
|
|
|
|
|
public boolean fetchOutputs() throws IOException {
|
|
public boolean fetchOutputs() throws IOException {
|
|
int totalFailures = 0;
|
|
int totalFailures = 0;
|
|
int numInFlight = 0, numCopied = 0;
|
|
int numInFlight = 0, numCopied = 0;
|
|
- long bytesTransferred = 0;
|
|
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
|
final Progress copyPhase =
|
|
final Progress copyPhase =
|
|
reduceTask.getProgress().phase();
|
|
reduceTask.getProgress().phase();
|
|
@@ -1938,12 +1935,11 @@ class ReduceTask extends Task {
|
|
if (cr.getSuccess()) { // a successful copy
|
|
if (cr.getSuccess()) { // a successful copy
|
|
numCopied++;
|
|
numCopied++;
|
|
lastProgressTime = System.currentTimeMillis();
|
|
lastProgressTime = System.currentTimeMillis();
|
|
- bytesTransferred += cr.getSize();
|
|
|
|
- reducerInputBytes += cr.getSize();
|
|
|
|
|
|
+ reduceShuffleBytes.increment(cr.getSize());
|
|
|
|
|
|
long secsSinceStart =
|
|
long secsSinceStart =
|
|
(System.currentTimeMillis()-startTime)/1000+1;
|
|
(System.currentTimeMillis()-startTime)/1000+1;
|
|
- float mbs = ((float)bytesTransferred)/(1024*1024);
|
|
|
|
|
|
+ float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
|
|
float transferRate = mbs/secsSinceStart;
|
|
float transferRate = mbs/secsSinceStart;
|
|
|
|
|
|
copyPhase.startNextPhase();
|
|
copyPhase.startNextPhase();
|