|
@@ -1259,7 +1259,8 @@ class ReduceTask extends Task {
|
|
|
Path tmpMapOutput = new Path(filename+"-"+id);
|
|
|
|
|
|
// Copy the map output
|
|
|
- MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
|
|
|
+ MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
|
|
|
+ reduceId.getTaskID().getId());
|
|
|
if (mapOutput == null) {
|
|
|
throw new IOException("Failed to fetch map-output for " +
|
|
|
loc.getTaskAttemptId() + " from " +
|
|
@@ -1341,24 +1342,60 @@ class ReduceTask extends Task {
|
|
|
* @throws IOException when something goes wrong
|
|
|
*/
|
|
|
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
|
|
|
- Path filename)
|
|
|
+ Path filename, int reduce)
|
|
|
throws IOException, InterruptedException {
|
|
|
// Connect
|
|
|
URLConnection connection =
|
|
|
mapOutputLoc.getOutputLocation().openConnection();
|
|
|
InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
|
|
|
DEFAULT_READ_TIMEOUT);
|
|
|
-
|
|
|
- //We will put a file in memory if it meets certain criteria:
|
|
|
- //1. The size of the (decompressed) file should be less than 25% of
|
|
|
- // the total inmem fs
|
|
|
- //2. There is space available in the inmem fs
|
|
|
-
|
|
|
+
|
|
|
+ // Validate header from map output
|
|
|
+ TaskAttemptID mapId = null;
|
|
|
+ try {
|
|
|
+ mapId =
|
|
|
+ TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
|
|
|
+ } catch (IllegalArgumentException ia) {
|
|
|
+ LOG.warn("Invalid map id ", ia);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
|
|
|
+ if (!mapId.equals(expectedMapId)) {
|
|
|
+ LOG.warn("data from wrong map:" + mapId +
|
|
|
+ " arrived to reduce task " + reduce +
|
|
|
+ ", where as expected map output should be from " + expectedMapId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
long decompressedLength =
|
|
|
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
|
|
|
long compressedLength =
|
|
|
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
|
|
|
|
|
|
+ if (compressedLength < 0 || decompressedLength < 0) {
|
|
|
+ LOG.warn(getName() + " invalid lengths in map output header: id: " +
|
|
|
+ mapId + " compressed len: " + compressedLength +
|
|
|
+ ", decompressed len: " + decompressedLength);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ int forReduce =
|
|
|
+ (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
|
|
|
+
|
|
|
+ if (forReduce != reduce) {
|
|
|
+ LOG.warn("data for the wrong reduce: " + forReduce +
|
|
|
+ " with compressed len: " + compressedLength +
|
|
|
+ ", decompressed len: " + decompressedLength +
|
|
|
+ " arrived to reduce task " + reduce);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
|
|
|
+ ", decompressed len: " + decompressedLength);
|
|
|
+
|
|
|
+ //We will put a file in memory if it meets certain criteria:
|
|
|
+ //1. The size of the (decompressed) file should be less than 25% of
|
|
|
+ // the total inmem fs
|
|
|
+ //2. There is space available in the inmem fs
|
|
|
+
|
|
|
// Check if this map-output can be saved in-memory
|
|
|
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
|
|
|
|