|
@@ -52,20 +52,20 @@ import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
|
|
|
* This Ant code was enhanced so that it can de-compress blocks of bzip2 data.
|
|
|
* Current position in the stream is an important statistic for Hadoop. For
|
|
|
* example in LineRecordReader, we solely depend on the current position in the
|
|
|
- * stream to know about the progess. The notion of position becomes complicated
|
|
|
+ * stream to know about the progress. The notion of position becomes complicated
|
|
|
* for compressed files. The Hadoop splitting is done in terms of compressed
|
|
|
* file. But a compressed file deflates to a large amount of data. So we have
|
|
|
* handled this problem in the following way.
|
|
|
*
|
|
|
* On object creation time, we find the next block start delimiter. Once such a
|
|
|
* marker is found, the stream stops there (we discard any read compressed data
|
|
|
- * in this process) and the position is updated (i.e. the caller of this class
|
|
|
- * will find out the stream location). At this point we are ready for actual
|
|
|
- * reading (i.e. decompression) of data.
|
|
|
+ * in this process) and the position is reported as the beginning of the block
|
|
|
+ * start delimiter. At this point we are ready for actual reading
|
|
|
+ * (i.e. decompression) of data.
|
|
|
*
|
|
|
* The subsequent read calls give out data. The position is updated when the
|
|
|
* caller of this class has read off the current block + 1 bytes. In between the
|
|
|
- * block reading, position is not updated. (We can only update the postion on
|
|
|
+ * block reading, position is not updated. (We can only update the position on
|
|
|
* block boundaries).
|
|
|
* </p>
|
|
|
*
|
|
@@ -204,11 +204,12 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
|
|
* in the stream. It can find bit patterns of length <= 63 bits. Specifically
|
|
|
* this method is used in CBZip2InputStream to find the end of block (EOB)
|
|
|
* delimiter in the stream, starting from the current position of the stream.
|
|
|
- * If marker is found, the stream position will be right after marker at the
|
|
|
- * end of this call.
|
|
|
+ * If marker is found, the stream position will be at the byte containing
|
|
|
+ * the starting bit of the marker.
|
|
|
*
|
|
|
* @param marker The bit pattern to be found in the stream
|
|
|
* @param markerBitLength No of bits in the marker
|
|
|
+ * @return true if the marker was found otherwise false
|
|
|
*
|
|
|
* @throws IOException
|
|
|
* @throws IllegalArgumentException if marketBitLength is greater than 63
|
|
@@ -224,23 +225,33 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
|
|
long bytes = 0;
|
|
|
bytes = this.bsR(markerBitLength);
|
|
|
if (bytes == -1) {
|
|
|
+ this.reportedBytesReadFromCompressedStream =
|
|
|
+ this.bytesReadFromCompressedStream;
|
|
|
return false;
|
|
|
}
|
|
|
while (true) {
|
|
|
if (bytes == marker) {
|
|
|
+ // Report the byte position where the marker starts
|
|
|
+ long markerBytesRead = (markerBitLength + this.bsLive + 7) / 8;
|
|
|
+ this.reportedBytesReadFromCompressedStream =
|
|
|
+ this.bytesReadFromCompressedStream - markerBytesRead;
|
|
|
return true;
|
|
|
-
|
|
|
} else {
|
|
|
bytes = bytes << 1;
|
|
|
bytes = bytes & ((1L << markerBitLength) - 1);
|
|
|
int oneBit = (int) this.bsR(1);
|
|
|
if (oneBit != -1) {
|
|
|
bytes = bytes | oneBit;
|
|
|
- } else
|
|
|
+ } else {
|
|
|
+ this.reportedBytesReadFromCompressedStream =
|
|
|
+ this.bytesReadFromCompressedStream;
|
|
|
return false;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
|
+ this.reportedBytesReadFromCompressedStream =
|
|
|
+ this.bytesReadFromCompressedStream;
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -302,7 +313,6 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
|
|
} else if (readMode == READ_MODE.BYBLOCK) {
|
|
|
this.currentState = STATE.NO_PROCESS_STATE;
|
|
|
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
|
|
|
- this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
|
|
|
if(!skipDecompression){
|
|
|
changeStateToProcessABlock();
|
|
|
}
|
|
@@ -419,8 +429,6 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
|
|
result = b;
|
|
|
|
|
|
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
|
|
|
- //Exactly when we are about to start a new block, we advertise the stream position.
|
|
|
- this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
|
|
|
|
|
|
changeStateToProcessABlock();
|
|
|
}
|