|
@@ -18,6 +18,8 @@
|
|
package org.apache.hadoop.mapred;
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.EOFException;
|
|
import java.io.EOFException;
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
|
|
|
|
@@ -45,7 +47,7 @@ import org.apache.hadoop.io.serializer.Serializer;
|
|
*/
|
|
*/
|
|
class IFile {
|
|
class IFile {
|
|
|
|
|
|
- private static int EOF_MARKER = -1;
|
|
|
|
|
|
+ private static final int EOF_MARKER = -1;
|
|
|
|
|
|
/**
|
|
/**
|
|
* <code>IFile.Writer</code> to write out intermediate map-outputs.
|
|
* <code>IFile.Writer</code> to write out intermediate map-outputs.
|
|
@@ -54,6 +56,7 @@ class IFile {
|
|
FSDataOutputStream out;
|
|
FSDataOutputStream out;
|
|
boolean ownOutputStream = false;
|
|
boolean ownOutputStream = false;
|
|
long start = 0;
|
|
long start = 0;
|
|
|
|
+ FSDataOutputStream rawOut;
|
|
|
|
|
|
CompressionOutputStream compressedOut;
|
|
CompressionOutputStream compressedOut;
|
|
Compressor compressor;
|
|
Compressor compressor;
|
|
@@ -79,6 +82,9 @@ class IFile {
|
|
public Writer(Configuration conf, FSDataOutputStream out,
|
|
public Writer(Configuration conf, FSDataOutputStream out,
|
|
Class<K> keyClass, Class<V> valueClass,
|
|
Class<K> keyClass, Class<V> valueClass,
|
|
CompressionCodec codec) throws IOException {
|
|
CompressionCodec codec) throws IOException {
|
|
|
|
+ this.rawOut = out;
|
|
|
|
+ this.start = this.rawOut.getPos();
|
|
|
|
+
|
|
if (codec != null) {
|
|
if (codec != null) {
|
|
this.compressor = CodecPool.getCompressor(codec);
|
|
this.compressor = CodecPool.getCompressor(codec);
|
|
this.compressor.reset();
|
|
this.compressor.reset();
|
|
@@ -88,7 +94,6 @@ class IFile {
|
|
} else {
|
|
} else {
|
|
this.out = out;
|
|
this.out = out;
|
|
}
|
|
}
|
|
- this.start = this.out.getPos();
|
|
|
|
|
|
|
|
this.keyClass = keyClass;
|
|
this.keyClass = keyClass;
|
|
this.valueClass = valueClass;
|
|
this.valueClass = valueClass;
|
|
@@ -100,34 +105,34 @@ class IFile {
|
|
}
|
|
}
|
|
|
|
|
|
public void close() throws IOException {
|
|
public void close() throws IOException {
|
|
|
|
+ // Close the serializers
|
|
|
|
+ keySerializer.close();
|
|
|
|
+ valueSerializer.close();
|
|
|
|
+
|
|
// Write EOF_MARKER for key/value length
|
|
// Write EOF_MARKER for key/value length
|
|
WritableUtils.writeVInt(out, EOF_MARKER);
|
|
WritableUtils.writeVInt(out, EOF_MARKER);
|
|
WritableUtils.writeVInt(out, EOF_MARKER);
|
|
WritableUtils.writeVInt(out, EOF_MARKER);
|
|
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
|
|
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
|
|
|
|
|
|
if (compressOutput) {
|
|
if (compressOutput) {
|
|
- // Return the compressor
|
|
|
|
|
|
+ // Flush data from buffers into the compressor
|
|
|
|
+ out.flush();
|
|
|
|
+
|
|
|
|
+ // Flush & return the compressor
|
|
compressedOut.finish();
|
|
compressedOut.finish();
|
|
compressedOut.resetState();
|
|
compressedOut.resetState();
|
|
CodecPool.returnCompressor(compressor);
|
|
CodecPool.returnCompressor(compressor);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // Close the serializers
|
|
|
|
- keySerializer.close();
|
|
|
|
- valueSerializer.close();
|
|
|
|
|
|
|
|
// Close the stream
|
|
// Close the stream
|
|
- if (out != null) {
|
|
|
|
- out.flush();
|
|
|
|
- compressedBytesWritten = out.getPos() - start;
|
|
|
|
-
|
|
|
|
- // Close the underlying stream iff we own it...
|
|
|
|
- if (ownOutputStream) {
|
|
|
|
- out.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- out = null;
|
|
|
|
|
|
+ rawOut.flush();
|
|
|
|
+ compressedBytesWritten = rawOut.getPos() - start;
|
|
|
|
+
|
|
|
|
+ // Close the underlying stream iff we own it...
|
|
|
|
+ if (ownOutputStream) {
|
|
|
|
+ out.close();
|
|
}
|
|
}
|
|
|
|
+ out = null;
|
|
}
|
|
}
|
|
|
|
|
|
public void append(K key, V value) throws IOException {
|
|
public void append(K key, V value) throws IOException {
|
|
@@ -141,12 +146,18 @@ class IFile {
|
|
// Append the 'key'
|
|
// Append the 'key'
|
|
keySerializer.serialize(key);
|
|
keySerializer.serialize(key);
|
|
int keyLength = buffer.getLength();
|
|
int keyLength = buffer.getLength();
|
|
- if (keyLength == 0)
|
|
|
|
- throw new IOException("zero length keys not allowed: " + key);
|
|
|
|
|
|
+ if (keyLength < 0) {
|
|
|
|
+ throw new IOException("Negative key-length not allowed: " + keyLength +
|
|
|
|
+ " for " + key);
|
|
|
|
+ }
|
|
|
|
|
|
// Append the 'value'
|
|
// Append the 'value'
|
|
valueSerializer.serialize(value);
|
|
valueSerializer.serialize(value);
|
|
int valueLength = buffer.getLength() - keyLength;
|
|
int valueLength = buffer.getLength() - keyLength;
|
|
|
|
+ if (valueLength < 0) {
|
|
|
|
+ throw new IOException("Negative value-length not allowed: " +
|
|
|
|
+ valueLength + " for " + value);
|
|
|
|
+ }
|
|
|
|
|
|
// Write the record out
|
|
// Write the record out
|
|
WritableUtils.writeVInt(out, keyLength); // key length
|
|
WritableUtils.writeVInt(out, keyLength); // key length
|
|
@@ -165,8 +176,17 @@ class IFile {
|
|
public void append(DataInputBuffer key, DataInputBuffer value)
|
|
public void append(DataInputBuffer key, DataInputBuffer value)
|
|
throws IOException {
|
|
throws IOException {
|
|
int keyLength = key.getLength() - key.getPosition();
|
|
int keyLength = key.getLength() - key.getPosition();
|
|
- int valueLength = value.getLength() - value.getPosition();
|
|
|
|
|
|
+ if (keyLength < 0) {
|
|
|
|
+ throw new IOException("Negative key-length not allowed: " + keyLength +
|
|
|
|
+ " for " + key);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ int valueLength = value.getLength() - value.getPosition();
|
|
|
|
+ if (valueLength < 0) {
|
|
|
|
+ throw new IOException("Negative value-length not allowed: " +
|
|
|
|
+ valueLength + " for " + value);
|
|
|
|
+ }
|
|
|
|
+
|
|
WritableUtils.writeVInt(out, keyLength);
|
|
WritableUtils.writeVInt(out, keyLength);
|
|
WritableUtils.writeVInt(out, valueLength);
|
|
WritableUtils.writeVInt(out, valueLength);
|
|
out.write(key.getData(), key.getPosition(), keyLength);
|
|
out.write(key.getData(), key.getPosition(), keyLength);
|
|
@@ -192,7 +212,7 @@ class IFile {
|
|
*/
|
|
*/
|
|
public static class Reader<K extends Object, V extends Object> {
|
|
public static class Reader<K extends Object, V extends Object> {
|
|
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
|
|
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
|
|
- private static final int MAX_VINT_SIZE = 5;
|
|
|
|
|
|
+ private static final int MAX_VINT_SIZE = 9;
|
|
|
|
|
|
InputStream in;
|
|
InputStream in;
|
|
Decompressor decompressor;
|
|
Decompressor decompressor;
|
|
@@ -204,6 +224,8 @@ class IFile {
|
|
int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
DataInputBuffer dataIn = new DataInputBuffer();
|
|
DataInputBuffer dataIn = new DataInputBuffer();
|
|
|
|
|
|
|
|
+ int recNo = 1;
|
|
|
|
+
|
|
public Reader(Configuration conf, FileSystem fs, Path file,
|
|
public Reader(Configuration conf, FileSystem fs, Path file,
|
|
CompressionCodec codec) throws IOException {
|
|
CompressionCodec codec) throws IOException {
|
|
this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
|
|
this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
|
|
@@ -226,6 +248,15 @@ class IFile {
|
|
|
|
|
|
public long getLength() { return fileLength; }
|
|
public long getLength() { return fileLength; }
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Read upto len bytes into buf starting at offset off.
|
|
|
|
+ *
|
|
|
|
+ * @param buf buffer
|
|
|
|
+ * @param off offset
|
|
|
|
+ * @param len length of buffer
|
|
|
|
+ * @return the no. of bytes read
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
private int readData(byte[] buf, int off, int len) throws IOException {
|
|
private int readData(byte[] buf, int off, int len) throws IOException {
|
|
int bytesRead = 0;
|
|
int bytesRead = 0;
|
|
while (bytesRead < len) {
|
|
while (bytesRead < len) {
|
|
@@ -291,6 +322,16 @@ class IFile {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Sanity check
|
|
|
|
+ if (keyLength < 0) {
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Negative key-length: " +
|
|
|
|
+ keyLength);
|
|
|
|
+ }
|
|
|
|
+ if (valueLength < 0) {
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Negative value-length: " +
|
|
|
|
+ valueLength);
|
|
|
|
+ }
|
|
|
|
+
|
|
final int recordLength = keyLength + valueLength;
|
|
final int recordLength = keyLength + valueLength;
|
|
|
|
|
|
// Check if we have the raw key/value in the buffer
|
|
// Check if we have the raw key/value in the buffer
|
|
@@ -299,7 +340,8 @@ class IFile {
|
|
|
|
|
|
// Sanity check
|
|
// Sanity check
|
|
if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
|
|
if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
|
|
- throw new EOFException("Could read the next record");
|
|
|
|
|
|
+ throw new EOFException("Rec# " + recNo + ": Could read the next " +
|
|
|
|
+ " record");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -310,9 +352,17 @@ class IFile {
|
|
value.reset(data, (pos + keyLength), valueLength);
|
|
value.reset(data, (pos + keyLength), valueLength);
|
|
|
|
|
|
// Position for the next record
|
|
// Position for the next record
|
|
- dataIn.skip(recordLength);
|
|
|
|
|
|
+ long skipped = dataIn.skip(recordLength);
|
|
|
|
+ if (skipped != recordLength) {
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Failed to skip past record " +
|
|
|
|
+ "of length: " + recordLength);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Record the bytes read
|
|
bytesRead += recordLength;
|
|
bytesRead += recordLength;
|
|
|
|
|
|
|
|
+ ++recNo;
|
|
|
|
+
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -324,9 +374,7 @@ class IFile {
|
|
}
|
|
}
|
|
|
|
|
|
// Close the underlying stream
|
|
// Close the underlying stream
|
|
- if (in != null) {
|
|
|
|
- in.close();
|
|
|
|
- }
|
|
|
|
|
|
+ in.close();
|
|
|
|
|
|
// Release the buffer
|
|
// Release the buffer
|
|
dataIn = null;
|
|
dataIn = null;
|
|
@@ -339,18 +387,34 @@ class IFile {
|
|
*/
|
|
*/
|
|
public static class InMemoryReader<K, V> extends Reader<K, V> {
|
|
public static class InMemoryReader<K, V> extends Reader<K, V> {
|
|
RamManager ramManager;
|
|
RamManager ramManager;
|
|
|
|
+ TaskAttemptID taskAttemptId;
|
|
|
|
|
|
- public InMemoryReader(RamManager ramManager,
|
|
|
|
|
|
+ public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
|
|
byte[] data, int start, int length) {
|
|
byte[] data, int start, int length) {
|
|
this.ramManager = ramManager;
|
|
this.ramManager = ramManager;
|
|
|
|
+ this.taskAttemptId = taskAttemptId;
|
|
|
|
|
|
buffer = data;
|
|
buffer = data;
|
|
fileLength = bufferSize = (length - start);
|
|
fileLength = bufferSize = (length - start);
|
|
dataIn.reset(buffer, start, length);
|
|
dataIn.reset(buffer, start, length);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void dumpOnError() {
|
|
|
|
+ File dumpFile = new File("../output/" + taskAttemptId + ".dump");
|
|
|
|
+ System.err.println("Dumping corrupt map-output of " + taskAttemptId +
|
|
|
|
+ " to " + dumpFile.getAbsolutePath());
|
|
|
|
+ try {
|
|
|
|
+ FileOutputStream fos = new FileOutputStream(dumpFile);
|
|
|
|
+ fos.write(buffer, 0, bufferSize);
|
|
|
|
+ fos.close();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ System.err.println("Failed to dump map-output of " + taskAttemptId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public boolean next(DataInputBuffer key, DataInputBuffer value)
|
|
public boolean next(DataInputBuffer key, DataInputBuffer value)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ try {
|
|
// Sanity check
|
|
// Sanity check
|
|
if (eof) {
|
|
if (eof) {
|
|
throw new EOFException("Completed reading " + bytesRead);
|
|
throw new EOFException("Completed reading " + bytesRead);
|
|
@@ -369,6 +433,16 @@ class IFile {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Sanity check
|
|
|
|
+ if (keyLength < 0) {
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Negative key-length: " +
|
|
|
|
+ keyLength);
|
|
|
|
+ }
|
|
|
|
+ if (valueLength < 0) {
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Negative value-length: " +
|
|
|
|
+ valueLength);
|
|
|
|
+ }
|
|
|
|
+
|
|
final int recordLength = keyLength + valueLength;
|
|
final int recordLength = keyLength + valueLength;
|
|
|
|
|
|
// Setup the key and value
|
|
// Setup the key and value
|
|
@@ -380,14 +454,20 @@ class IFile {
|
|
// Position for the next record
|
|
// Position for the next record
|
|
long skipped = dataIn.skip(recordLength);
|
|
long skipped = dataIn.skip(recordLength);
|
|
if (skipped != recordLength) {
|
|
if (skipped != recordLength) {
|
|
- throw new IOException("Failed to skip past record of length: " +
|
|
|
|
|
|
+ throw new IOException("Rec# " + recNo + ": Failed to skip past record of length: " +
|
|
recordLength);
|
|
recordLength);
|
|
}
|
|
}
|
|
|
|
|
|
// Record the byte
|
|
// Record the byte
|
|
bytesRead += recordLength;
|
|
bytesRead += recordLength;
|
|
|
|
|
|
|
|
+ ++recNo;
|
|
|
|
+
|
|
return true;
|
|
return true;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ dumpOnError();
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public void close() {
|
|
public void close() {
|