Browse Source

MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon and Brandon Li)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1368724 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 năm trước cách đây
mục cha
commit
76210be039
2 tập tin đã thay đổi với 43 bổ sung6 xóa
  1. 3 0
      CHANGES.txt
  2. 40 6
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -65,6 +65,9 @@ Release 1.2.0 - unreleased
 
     HDFS-3697. Enable fadvise readahead by default. (todd via eli)
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Brandon Li via sseth)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

+ 40 - 6
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -73,7 +73,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -341,6 +344,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     "mapreduce.tasktracker.outofband.heartbeat.damper";
   static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
   private volatile int oobHeartbeatDamper;
+  private boolean manageOsCacheInShuffle = false;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
   
   // Track number of completed tasks to send an out-of-band heartbeat
   private AtomicInteger finishedCount = new AtomicInteger(0);
@@ -881,6 +887,12 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     oobHeartbeatDamper = 
       fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
           DEFAULT_OOB_HEARTBEAT_DAMPER);
+    manageOsCacheInShuffle = fConf.getBoolean(
+      "mapreduce.shuffle.manage.os.cache",
+      true);
+    readaheadLength = fConf.getInt(
+      "mapreduce.shuffle.readahead.bytes",
+      4 * 1024 * 1024);
   }
 
   private void startJettyBugMonitor() {
@@ -3978,16 +3990,30 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
          * send it to the reducer.
          */
         //open the map-output file
+        String filePath = mapOutputFileName.toUri().getPath();
         mapOutputIn = SecureIOUtils.openForRead(
-            new File(mapOutputFileName.toUri().getPath()), runAsUserName);
+            new File(filePath), runAsUserName);
+            //new File(mapOutputFileName.toUri().getPath()), runAsUserName);
 
+        ReadaheadRequest curReadahead = null;
+        
         //seek to the correct offset for the reduce
         mapOutputIn.skip(info.startOffset);
         long rem = info.partLength;
-        int len =
-          mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long offset = info.startOffset;
+        while (rem > 0) {
+          if (tracker.manageOsCacheInShuffle && tracker.readaheadPool != null) {
+            curReadahead = tracker.readaheadPool.readaheadStream(filePath,
+                mapOutputIn.getFD(), offset, tracker.readaheadLength,
+                info.startOffset + info.partLength, curReadahead);
+          }
+          int len = mapOutputIn.read(buffer, 0,
+              (int) Math.min(rem, MAX_BYTES_TO_READ));
+          if (len < 0) {
+            break;
+          }
           rem -= len;
+          offset += len;
           try {
             shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
@@ -3997,10 +4023,18 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
             throw ie;
           }
           totalRead += len;
-          len =
-            mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
         
+        if (curReadahead != null) {
+          curReadahead.cancel();
+        }
+
+        // drop cache if possible
+        if (tracker.manageOsCacheInShuffle && info.partLength > 0) {
+          NativeIO.posixFadviseIfPossible(mapOutputIn.getFD(),
+              info.startOffset, info.partLength, NativeIO.POSIX_FADV_DONTNEED);
+        }
+
         if (LOG.isDebugEnabled()) {
           LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + info.partLength + "/" +