|
@@ -402,21 +402,24 @@ class TaskLog {
|
|
|
// Copy log data into buffer
|
|
|
byte[] b = new byte[totalLogSize];
|
|
|
SequenceInputStream in = new SequenceInputStream(streams.elements());
|
|
|
- int bytesRead = 0, totalBytesRead = 0;
|
|
|
- int off = 0, len = totalLogSize;
|
|
|
- LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
- while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
- LOG.debug("Got " + bytesRead + " bytes");
|
|
|
- off += bytesRead;
|
|
|
- len -= bytesRead;
|
|
|
+ try {
|
|
|
+ int bytesRead = 0, totalBytesRead = 0;
|
|
|
+ int off = 0, len = totalLogSize;
|
|
|
+ LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
+ while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
+ LOG.debug("Got " + bytesRead + " bytes");
|
|
|
+ off += bytesRead;
|
|
|
+ len -= bytesRead;
|
|
|
|
|
|
- totalBytesRead += bytesRead;
|
|
|
- }
|
|
|
+ totalBytesRead += bytesRead;
|
|
|
+ }
|
|
|
|
|
|
- if (totalBytesRead != totalLogSize) {
|
|
|
- LOG.debug("Didn't not read all requisite data in logs!");
|
|
|
+ if (totalBytesRead != totalLogSize) {
|
|
|
+ LOG.debug("Didn't not read all requisite data in logs!");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ try { in.close(); } catch (IOException ex) {}
|
|
|
}
|
|
|
-
|
|
|
return b;
|
|
|
}
|
|
|
|
|
@@ -527,23 +530,28 @@ class TaskLog {
|
|
|
|
|
|
// Copy requisite data into user buffer
|
|
|
SequenceInputStream in = new SequenceInputStream(streams.elements());
|
|
|
- if (streams.size() == (stopIndex - startIndex +1)) {
|
|
|
- // Skip to get to 'logOffset' if logs haven't been purged
|
|
|
- long skipBytes =
|
|
|
- in.skip(logOffset - indexRecords[startIndex].splitOffset);
|
|
|
- LOG.debug("Skipped " + skipBytes + " bytes from " +
|
|
|
- startIndex + " stream");
|
|
|
- }
|
|
|
- int bytesRead = 0, totalBytesRead = 0;
|
|
|
- len = Math.min((int)logLength, len);
|
|
|
- LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
- while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
- off += bytesRead;
|
|
|
- len -= bytesRead;
|
|
|
+ int totalBytesRead = 0;
|
|
|
+ try {
|
|
|
+ if (streams.size() == (stopIndex - startIndex +1)) {
|
|
|
+ // Skip to get to 'logOffset' if logs haven't been purged
|
|
|
+ long skipBytes =
|
|
|
+ in.skip(logOffset - indexRecords[startIndex].splitOffset);
|
|
|
+ LOG.debug("Skipped " + skipBytes + " bytes from " +
|
|
|
+ startIndex + " stream");
|
|
|
+ }
|
|
|
+ int bytesRead = 0;
|
|
|
+ len = Math.min((int)logLength, len);
|
|
|
+ LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
+ while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
+ off += bytesRead;
|
|
|
+ len -= bytesRead;
|
|
|
|
|
|
- totalBytesRead += bytesRead;
|
|
|
+ totalBytesRead += bytesRead;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ try { in.close(); } catch (IOException e) {}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return totalBytesRead;
|
|
|
}
|
|
|
|