瀏覽代碼

MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1373669 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 12 年之前
父節點
當前提交
8fcad7e8e9

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -142,6 +142,8 @@ Branch-2 ( Unreleased changes )
     MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
     (Jason Lowe via bobby)
 
+    MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
+
   BUG FIXES
 
     MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java

@@ -340,7 +340,7 @@ public class IFile {
                   CompressionCodec codec,
                   Counters.Counter readsCounter) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length);
+      checksumIn = new IFileInputStream(in,length, conf);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {

+ 59 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java

@@ -19,13 +19,22 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.DataChecksum;
 /**
  * A checksum input stream, used for IFiles.
@@ -35,7 +44,8 @@ import org.apache.hadoop.util.DataChecksum;
 @InterfaceStability.Unstable
 public class IFileInputStream extends InputStream {
   
-  private final InputStream in; //The input stream to be verified for checksum. 
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
   private final long length; //The total length of the input file
   private final long dataLength;
   private DataChecksum sum;
@@ -43,7 +53,14 @@ public class IFileInputStream extends InputStream {
   private final byte b[] = new byte[1];
   private byte csum[] = null;
   private int checksumSize;
-  
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
   private boolean disableChecksumValidation = false;
   
   /**
@@ -51,13 +68,36 @@ public class IFileInputStream extends InputStream {
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
    */
-  public IFileInputStream(InputStream in, long len) {
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
     this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
     sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     length = len;
     dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
+        MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
+        MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
   }
 
   /**
@@ -66,6 +106,10 @@ public class IFileInputStream extends InputStream {
    */
   @Override
   public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
     if (currentOffset < dataLength) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -102,10 +146,21 @@ public class IFileInputStream extends InputStream {
     if (currentOffset >= dataLength) {
       return -1;
     }
-    
+
+    doReadahead();
+
     return doRead(b,off,len);
   }
 
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
   /**
    * Read bytes from the stream.
    * At EOF, checksum is validated and sent back

+ 17 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java

@@ -84,4 +84,20 @@ public interface MRConfig {
     "mapreduce.shuffle.ssl.enabled";
 
   public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
-}
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;}

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -98,6 +98,8 @@ class Fetcher<K,V> extends Thread {
 
   private volatile boolean stopped = false;
 
+  private JobConf job;
+
   private static boolean sslShuffle;
   private static SSLFactory sslFactory;
 
@@ -105,6 +107,7 @@ class Fetcher<K,V> extends Thread {
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
                  ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+    this.job = job;
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -539,7 +542,7 @@ class Fetcher<K,V> extends Thread {
                                int decompressedLength, 
                                int compressedLength) throws IOException {    
     IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength);
+      new IFileInputStream(input, compressedLength, job);
 
     input = checksumIn;       
   

+ 14 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -959,6 +959,20 @@
     acceptable.
     </description>
   </property>
+
+  <property>
+    <name>mapreduce.ifile.readahead</name>
+    <value>true</value>
+    <description>Configuration key to enable/disable IFile readahead.
+    </description>
+  </property>
+
+  <property>
+    <name>mapreduce.ifile.readahead.bytes</name>
+    <value>4194304</value>
+    <description>Configuration key to set the IFile readahead length in bytes.
+    </description>
+  </property>
   
 <!-- Job Notification Configuration -->
 

+ 4 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends TestCase {
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     for (int i = 0; i < DLEN; ++i) {
       assertEquals(i, ifis.read());
     }
@@ -54,7 +55,7 @@ public class TestIFileStreams extends TestCase {
     final byte[] b = dob.getData();
     ++b[17];
     dib.reset(b, DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     int i = 0;
     try {
       while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends TestCase {
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 100);
+    IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
     int i = 0;
     try {
       while (i < DLEN - 8) {