|
@@ -17,6 +17,8 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import java.io.DataInput;
|
|
|
+import java.io.DataInputStream;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
@@ -250,12 +252,16 @@ class IFile {
|
|
|
final long fileLength;
|
|
|
boolean eof = false;
|
|
|
final IFileInputStream checksumIn;
|
|
|
+ DataInputStream dataIn;
|
|
|
|
|
|
byte[] buffer = null;
|
|
|
int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
|
- DataInputBuffer dataIn = new DataInputBuffer();
|
|
|
|
|
|
int recNo = 1;
|
|
|
+ int currentKeyLength;
|
|
|
+ int currentValueLength;
|
|
|
+ byte keyBytes[] = new byte[0];
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Construct an IFile Reader.
|
|
@@ -298,6 +304,7 @@ class IFile {
|
|
|
} else {
|
|
|
this.in = checksumIn;
|
|
|
}
|
|
|
+ this.dataIn = new DataInputStream(this.in);
|
|
|
this.fileLength = length;
|
|
|
|
|
|
if (conf != null) {
|
|
@@ -334,104 +341,70 @@ class IFile {
|
|
|
return len;
|
|
|
}
|
|
|
|
|
|
- void readNextBlock(int minSize) throws IOException {
|
|
|
- if (buffer == null) {
|
|
|
- buffer = new byte[bufferSize];
|
|
|
- dataIn.reset(buffer, 0, 0);
|
|
|
- }
|
|
|
- buffer =
|
|
|
- rejigData(buffer,
|
|
|
- (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
|
|
|
- bufferSize = buffer.length;
|
|
|
- }
|
|
|
-
|
|
|
- private byte[] rejigData(byte[] source, byte[] destination)
|
|
|
- throws IOException{
|
|
|
- // Copy remaining data into the destination array
|
|
|
- int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
|
|
|
- if (bytesRemaining > 0) {
|
|
|
- System.arraycopy(source, dataIn.getPosition(),
|
|
|
- destination, 0, bytesRemaining);
|
|
|
- }
|
|
|
-
|
|
|
- // Read as much data as will fit from the underlying stream
|
|
|
- int n = readData(destination, bytesRemaining,
|
|
|
- (destination.length - bytesRemaining));
|
|
|
- dataIn.reset(destination, 0, (bytesRemaining + n));
|
|
|
-
|
|
|
- return destination;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean next(DataInputBuffer key, DataInputBuffer value)
|
|
|
- throws IOException {
|
|
|
+ protected boolean positionToNextRecord(DataInput dIn) throws IOException {
|
|
|
// Sanity check
|
|
|
if (eof) {
|
|
|
throw new EOFException("Completed reading " + bytesRead);
|
|
|
}
|
|
|
|
|
|
- // Check if we have enough data to read lengths
|
|
|
- if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
|
|
|
- readNextBlock(2*MAX_VINT_SIZE);
|
|
|
- }
|
|
|
-
|
|
|
// Read key and value lengths
|
|
|
- int oldPos = dataIn.getPosition();
|
|
|
- int keyLength = WritableUtils.readVInt(dataIn);
|
|
|
- int valueLength = WritableUtils.readVInt(dataIn);
|
|
|
- int pos = dataIn.getPosition();
|
|
|
- bytesRead += pos - oldPos;
|
|
|
+ currentKeyLength = WritableUtils.readVInt(dIn);
|
|
|
+ currentValueLength = WritableUtils.readVInt(dIn);
|
|
|
+ bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
|
|
|
+ WritableUtils.getVIntSize(currentValueLength);
|
|
|
|
|
|
// Check for EOF
|
|
|
- if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
|
|
|
+ if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
|
|
|
eof = true;
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// Sanity check
|
|
|
- if (keyLength < 0) {
|
|
|
+ if (currentKeyLength < 0) {
|
|
|
throw new IOException("Rec# " + recNo + ": Negative key-length: " +
|
|
|
- keyLength);
|
|
|
+ currentKeyLength);
|
|
|
}
|
|
|
- if (valueLength < 0) {
|
|
|
+ if (currentValueLength < 0) {
|
|
|
throw new IOException("Rec# " + recNo + ": Negative value-length: " +
|
|
|
- valueLength);
|
|
|
+ currentValueLength);
|
|
|
}
|
|
|
-
|
|
|
- final int recordLength = keyLength + valueLength;
|
|
|
-
|
|
|
- // Check if we have the raw key/value in the buffer
|
|
|
- if ((dataIn.getLength()-pos) < recordLength) {
|
|
|
- readNextBlock(recordLength);
|
|
|
-
|
|
|
- // Sanity check
|
|
|
- if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
|
|
|
- throw new EOFException("Rec# " + recNo + ": Could read the next " +
|
|
|
- " record");
|
|
|
- }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
|
|
|
+ if (!positionToNextRecord(dataIn)) {
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
- // Setup the key and value
|
|
|
- pos = dataIn.getPosition();
|
|
|
- byte[] data = dataIn.getData();
|
|
|
- key.reset(data, pos, keyLength);
|
|
|
- value.reset(data, (pos + keyLength), valueLength);
|
|
|
-
|
|
|
- // Position for the next record
|
|
|
- long skipped = dataIn.skip(recordLength);
|
|
|
- if (skipped != recordLength) {
|
|
|
- throw new IOException("Rec# " + recNo + ": Failed to skip past record " +
|
|
|
- "of length: " + recordLength);
|
|
|
+ if (keyBytes.length < currentKeyLength) {
|
|
|
+ keyBytes = new byte[currentKeyLength << 1];
|
|
|
}
|
|
|
+ int i = readData(keyBytes, 0, currentKeyLength);
|
|
|
+ if (i != currentKeyLength) {
|
|
|
+ throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
|
|
|
+ }
|
|
|
+ key.reset(keyBytes, currentKeyLength);
|
|
|
+ bytesRead += currentKeyLength;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void nextRawValue(DataInputBuffer value) throws IOException {
|
|
|
+ final byte[] valBytes = (value.getData().length < currentValueLength)
|
|
|
+ ? new byte[currentValueLength << 1]
|
|
|
+ : value.getData();
|
|
|
+ int i = readData(valBytes, 0, currentValueLength);
|
|
|
+ if (i != currentValueLength) {
|
|
|
+ throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
|
|
|
+ }
|
|
|
+ value.reset(valBytes, currentValueLength);
|
|
|
|
|
|
// Record the bytes read
|
|
|
- bytesRead += recordLength;
|
|
|
+ bytesRead += currentValueLength;
|
|
|
|
|
|
++recNo;
|
|
|
++numRecordsRead;
|
|
|
-
|
|
|
- return true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void close() throws IOException {
|
|
|
// Return the decompressor
|
|
|
if (decompressor != null) {
|
|
@@ -458,7 +431,7 @@ class IFile {
|
|
|
public static class InMemoryReader<K, V> extends Reader<K, V> {
|
|
|
RamManager ramManager;
|
|
|
TaskAttemptID taskAttemptId;
|
|
|
-
|
|
|
+ DataInputBuffer memDataIn = new DataInputBuffer();
|
|
|
public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
|
|
|
byte[] data, int start, int length)
|
|
|
throws IOException {
|
|
@@ -468,7 +441,7 @@ class IFile {
|
|
|
|
|
|
buffer = data;
|
|
|
bufferSize = (int)fileLength;
|
|
|
- dataIn.reset(buffer, start, length);
|
|
|
+ memDataIn.reset(buffer, start, length);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -497,58 +470,49 @@ class IFile {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public boolean next(DataInputBuffer key, DataInputBuffer value)
|
|
|
- throws IOException {
|
|
|
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
|
|
|
try {
|
|
|
- // Sanity check
|
|
|
- if (eof) {
|
|
|
- throw new EOFException("Completed reading " + bytesRead);
|
|
|
- }
|
|
|
-
|
|
|
- // Read key and value lengths
|
|
|
- int oldPos = dataIn.getPosition();
|
|
|
- int keyLength = WritableUtils.readVInt(dataIn);
|
|
|
- int valueLength = WritableUtils.readVInt(dataIn);
|
|
|
- int pos = dataIn.getPosition();
|
|
|
- bytesRead += pos - oldPos;
|
|
|
-
|
|
|
- // Check for EOF
|
|
|
- if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
|
|
|
- eof = true;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // Sanity check
|
|
|
- if (keyLength < 0) {
|
|
|
- throw new IOException("Rec# " + recNo + ": Negative key-length: " +
|
|
|
- keyLength);
|
|
|
- }
|
|
|
- if (valueLength < 0) {
|
|
|
- throw new IOException("Rec# " + recNo + ": Negative value-length: " +
|
|
|
- valueLength);
|
|
|
- }
|
|
|
+ if (!positionToNextRecord(memDataIn)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ // Setup the key
|
|
|
+ int pos = memDataIn.getPosition();
|
|
|
+ byte[] data = memDataIn.getData();
|
|
|
+ key.reset(data, pos, currentKeyLength);
|
|
|
+ // Position for the next value
|
|
|
+ long skipped = memDataIn.skip(currentKeyLength);
|
|
|
+ if (skipped != currentKeyLength) {
|
|
|
+ throw new IOException("Rec# " + recNo +
|
|
|
+ ": Failed to skip past key of length: " +
|
|
|
+ currentKeyLength);
|
|
|
+ }
|
|
|
|
|
|
- final int recordLength = keyLength + valueLength;
|
|
|
-
|
|
|
- // Setup the key and value
|
|
|
- pos = dataIn.getPosition();
|
|
|
- byte[] data = dataIn.getData();
|
|
|
- key.reset(data, pos, keyLength);
|
|
|
- value.reset(data, (pos + keyLength), valueLength);
|
|
|
-
|
|
|
- // Position for the next record
|
|
|
- long skipped = dataIn.skip(recordLength);
|
|
|
- if (skipped != recordLength) {
|
|
|
- throw new IOException("Rec# " + recNo + ": Failed to skip past record of length: " +
|
|
|
- recordLength);
|
|
|
+ // Record the byte
|
|
|
+ bytesRead += currentKeyLength;
|
|
|
+ return true;
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ dumpOnError();
|
|
|
+ throw ioe;
|
|
|
}
|
|
|
-
|
|
|
- // Record the byte
|
|
|
- bytesRead += recordLength;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void nextRawValue(DataInputBuffer value) throws IOException {
|
|
|
+ try {
|
|
|
+ int pos = memDataIn.getPosition();
|
|
|
+ byte[] data = memDataIn.getData();
|
|
|
+ value.reset(data, pos, currentValueLength);
|
|
|
|
|
|
- ++recNo;
|
|
|
-
|
|
|
- return true;
|
|
|
+ // Position for the next record
|
|
|
+ long skipped = memDataIn.skip(currentValueLength);
|
|
|
+ if (skipped != currentValueLength) {
|
|
|
+ throw new IOException("Rec# " + recNo +
|
|
|
+ ": Failed to skip past value of length: " +
|
|
|
+ currentValueLength);
|
|
|
+ }
|
|
|
+ // Record the byte
|
|
|
+ bytesRead += currentValueLength;
|
|
|
+
|
|
|
+ ++recNo;
|
|
|
} catch (IOException ioe) {
|
|
|
dumpOnError();
|
|
|
throw ioe;
|
|
@@ -557,7 +521,7 @@ class IFile {
|
|
|
|
|
|
public void close() {
|
|
|
// Release
|
|
|
- dataIn = null;
|
|
|
+ memDataIn = null;
|
|
|
buffer = null;
|
|
|
|
|
|
// Inform the RamManager
|