|
@@ -62,7 +62,6 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
-import org.apache.hadoop.io.WritableUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
@@ -2807,11 +2806,9 @@ public class TaskTracker
|
|
|
OutputStream outStream = null;
|
|
|
FSDataInputStream mapOutputIn = null;
|
|
|
|
|
|
- IFileInputStream checksumInputStream = null;
|
|
|
-
|
|
|
long totalRead = 0;
|
|
|
- ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
|
|
|
- context.getAttribute("shuffleServerMetrics");
|
|
|
+ ShuffleServerMetrics shuffleMetrics =
|
|
|
+ (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
|
|
|
TaskTracker tracker =
|
|
|
(TaskTracker) context.getAttribute("task.tracker");
|
|
|
|
|
@@ -2821,8 +2818,8 @@ public class TaskTracker
|
|
|
JobConf conf = (JobConf) context.getAttribute("conf");
|
|
|
LocalDirAllocator lDirAlloc =
|
|
|
(LocalDirAllocator)context.getAttribute("localDirAllocator");
|
|
|
- FileSystem fileSys =
|
|
|
- (FileSystem) context.getAttribute("local.file.system");
|
|
|
+ FileSystem rfs = ((LocalFileSystem)
|
|
|
+ context.getAttribute("local.file.system")).getRaw();
|
|
|
|
|
|
// Index file
|
|
|
Path indexFileName = lDirAlloc.getLocalPathToRead(
|
|
@@ -2843,18 +2840,15 @@ public class TaskTracker
|
|
|
IndexRecord info =
|
|
|
tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
|
|
|
|
|
|
- final long startOffset = info.startOffset;
|
|
|
- final long rawPartLength = info.rawLength;
|
|
|
- final long partLength = info.partLength;
|
|
|
-
|
|
|
//set the custom "Raw-Map-Output-Length" http header to
|
|
|
//the raw (decompressed) length
|
|
|
- response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
|
|
|
+ response.setHeader(RAW_MAP_OUTPUT_LENGTH,
|
|
|
+ Long.toString(info.rawLength));
|
|
|
|
|
|
//set the custom "Map-Output-Length" http header to
|
|
|
//the actual number of bytes being transferred
|
|
|
- response.setHeader(MAP_OUTPUT_LENGTH,
|
|
|
- Long.toString(partLength));
|
|
|
+ response.setHeader(MAP_OUTPUT_LENGTH,
|
|
|
+ Long.toString(info.partLength));
|
|
|
|
|
|
//use the same buffersize as used for reading the data from disk
|
|
|
response.setBufferSize(MAX_BYTES_TO_READ);
|
|
@@ -2864,33 +2858,15 @@ public class TaskTracker
|
|
|
* send it to the reducer.
|
|
|
*/
|
|
|
//open the map-output file
|
|
|
- FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
|
|
|
-
|
|
|
mapOutputIn = rfs.open(mapOutputFileName);
|
|
|
- // TODO: Remove this after a 'fix' for HADOOP-3647
|
|
|
- // The clever trick here to reduce the impact of the extra seek for
|
|
|
- // logging the first key/value lengths is to read the lengths before
|
|
|
- // the second seek for the actual shuffle. The second seek is almost
|
|
|
- // a no-op since it is very short (go back length of two VInts) and the
|
|
|
- // data is almost guaranteed to be in the filesystem's buffers.
|
|
|
- // WARN: This won't work for compressed map-outputs!
|
|
|
- int firstKeyLength = 0;
|
|
|
- int firstValueLength = 0;
|
|
|
- if (partLength > 0) {
|
|
|
- mapOutputIn.seek(startOffset);
|
|
|
- firstKeyLength = WritableUtils.readVInt(mapOutputIn);
|
|
|
- firstValueLength = WritableUtils.readVInt(mapOutputIn);
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
//seek to the correct offset for the reduce
|
|
|
- mapOutputIn.seek(startOffset);
|
|
|
- checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
|
|
|
-
|
|
|
- int len = checksumInputStream.readWithChecksum(buffer, 0,
|
|
|
- partLength < MAX_BYTES_TO_READ
|
|
|
- ? (int)partLength : MAX_BYTES_TO_READ);
|
|
|
- while (len > 0) {
|
|
|
+ mapOutputIn.seek(info.startOffset);
|
|
|
+ long rem = info.partLength;
|
|
|
+ int len =
|
|
|
+ mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
|
|
|
+ while (rem > 0 && len >= 0) {
|
|
|
+ rem -= len;
|
|
|
try {
|
|
|
shuffleMetrics.outputBytes(len);
|
|
|
outStream.write(buffer, 0, len);
|
|
@@ -2900,16 +2876,13 @@ public class TaskTracker
|
|
|
throw ie;
|
|
|
}
|
|
|
totalRead += len;
|
|
|
- if (totalRead == partLength) break;
|
|
|
- len = checksumInputStream.readWithChecksum(buffer, 0,
|
|
|
- (partLength - totalRead) < MAX_BYTES_TO_READ
|
|
|
- ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
|
|
|
+ len =
|
|
|
+ mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
|
|
|
- " from map: " + mapId + " given " + partLength + "/" +
|
|
|
- rawPartLength + " from " + startOffset + " with (" +
|
|
|
- firstKeyLength + ", " + firstValueLength + ")");
|
|
|
+ " from map: " + mapId + " given " + info.partLength + "/" +
|
|
|
+ info.rawLength);
|
|
|
} catch (IOException ie) {
|
|
|
Log log = (Log) context.getAttribute("log");
|
|
|
String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
|
|
@@ -2923,8 +2896,8 @@ public class TaskTracker
|
|
|
shuffleMetrics.failedOutput();
|
|
|
throw ie;
|
|
|
} finally {
|
|
|
- if (checksumInputStream != null) {
|
|
|
- checksumInputStream.close();
|
|
|
+ if (null != mapOutputIn) {
|
|
|
+ mapOutputIn.close();
|
|
|
}
|
|
|
shuffleMetrics.serverHandlerFree();
|
|
|
if (ClientTraceLog.isInfoEnabled()) {
|