|
@@ -19,11 +19,20 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
+import java.io.FileDescriptor;
|
|
|
+import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.fs.ChecksumException;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.HasFileDescriptor;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool;
|
|
|
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
/**
|
|
|
* A checksum input stream, used for IFiles.
|
|
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecksum;
|
|
|
|
|
|
class IFileInputStream extends InputStream {
|
|
|
|
|
|
- private final InputStream in; //The input stream to be verified for checksum.
|
|
|
+ private final InputStream in; //The input stream to be verified for checksum.
|
|
|
+ private final FileDescriptor inFd; // the file descriptor, if it is known
|
|
|
private final long length; //The total length of the input file
|
|
|
private final long dataLength;
|
|
|
private DataChecksum sum;
|
|
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStream {
|
|
|
private final byte b[] = new byte[1];
|
|
|
private byte csum[] = null;
|
|
|
private int checksumSize;
|
|
|
-
|
|
|
+
|
|
|
+ private ReadaheadRequest curReadahead = null;
|
|
|
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
|
|
|
+ private boolean readahead;
|
|
|
+ private int readaheadLength;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configuration key to enable/disable IFile readahead.
|
|
|
+ */
|
|
|
+ public static final String MAPRED_IFILE_READAHEAD =
|
|
|
+ "mapreduce.ifile.readahead";
|
|
|
+
|
|
|
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configuration key to set the IFile readahead length in bytes.
|
|
|
+ */
|
|
|
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
|
|
|
+ "mapreduce.ifile.readahead.bytes";
|
|
|
+
|
|
|
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
|
|
|
+ 4 * 1024 * 1024;
|
|
|
+
|
|
|
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
|
|
|
+
|
|
|
/**
|
|
|
* Create a checksum input stream that reads
|
|
|
* @param in The input stream to be verified for checksum.
|
|
|
* @param len The length of the input stream including checksum bytes.
|
|
|
*/
|
|
|
- public IFileInputStream(InputStream in, long len) {
|
|
|
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
|
|
|
this.in = in;
|
|
|
+ this.inFd = getFileDescriptorIfAvail(in);
|
|
|
sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
|
|
|
Integer.MAX_VALUE);
|
|
|
checksumSize = sum.getChecksumSize();
|
|
|
length = len;
|
|
|
dataLength = length - checksumSize;
|
|
|
+
|
|
|
+ conf = (conf != null) ? conf : new Configuration();
|
|
|
+ readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
|
|
|
+ DEFAULT_MAPRED_IFILE_READAHEAD);
|
|
|
+ readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
|
|
|
+ DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
|
|
|
+
|
|
|
+ doReadahead();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
|
|
|
+ FileDescriptor fd = null;
|
|
|
+ try {
|
|
|
+ if (in instanceof HasFileDescriptor) {
|
|
|
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
|
|
|
+ } else if (in instanceof FileInputStream) {
|
|
|
+ fd = ((FileInputStream)in).getFD();
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("Unable to determine FileDescriptor", e);
|
|
|
+ }
|
|
|
+ return fd;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStream {
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
+
|
|
|
+ if (curReadahead != null) {
|
|
|
+ curReadahead.cancel();
|
|
|
+ }
|
|
|
if (currentOffset < dataLength) {
|
|
|
byte[] t = new byte[Math.min((int)
|
|
|
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
|
|
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStream {
|
|
|
if (currentOffset >= dataLength) {
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ doReadahead();
|
|
|
+
|
|
|
return doRead(b,off,len);
|
|
|
}
|
|
|
|
|
|
+ private void doReadahead() {
|
|
|
+ if (raPool != null && inFd != null && readahead) {
|
|
|
+ curReadahead = raPool.readaheadStream(
|
|
|
+ "ifile", inFd,
|
|
|
+ currentOffset, readaheadLength, dataLength,
|
|
|
+ curReadahead);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Read bytes from the stream.
|
|
|
* At EOF, checksum is validated and sent back
|