|
@@ -238,12 +238,13 @@ class TaskLog {
|
|
|
throws IOException {
|
|
|
currentSplit = getLogSplit(split);
|
|
|
LOG.debug("About to create the split: " + currentSplit);
|
|
|
+ // Record the 'split' in the index
|
|
|
+ writeIndexRecord();
|
|
|
return new BufferedOutputStream(new FileOutputStream(currentSplit));
|
|
|
}
|
|
|
|
|
|
private synchronized void writeIndexRecord() throws IOException {
|
|
|
- String indexRecord = currentSplit + "|" + splitOffset + "|" +
|
|
|
- splitLength + "\n";
|
|
|
+ String indexRecord = currentSplit + "|" + splitOffset + "\n";
|
|
|
splitIndex.write(indexRecord.getBytes());
|
|
|
splitIndex.flush();
|
|
|
}
|
|
@@ -253,9 +254,6 @@ class TaskLog {
|
|
|
LOG.debug("About to rotate-out the split: " + noSplits);
|
|
|
out.close();
|
|
|
|
|
|
- // Record the 'split' in the index
|
|
|
- writeIndexRecord();
|
|
|
-
|
|
|
// Re-initialize the state
|
|
|
splitOffset += splitLength;
|
|
|
splitLength = 0;
|
|
@@ -312,12 +310,10 @@ class TaskLog {
|
|
|
private static class IndexRecord {
|
|
|
String splitName;
|
|
|
long splitOffset;
|
|
|
- long splitLength;
|
|
|
|
|
|
- IndexRecord(String splitName, long splitOffset, long splitLength) {
|
|
|
+ IndexRecord(String splitName, long splitOffset) {
|
|
|
this.splitName = splitName;
|
|
|
this.splitOffset = splitOffset;
|
|
|
- this.splitLength = splitLength;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -331,26 +327,26 @@ class TaskLog {
|
|
|
String line;
|
|
|
while ((line = splitIndex.readLine()) != null) {
|
|
|
String[] fields = line.split("\\|");
|
|
|
- if (fields.length != 3) {
|
|
|
+ if (fields.length != 2) {
|
|
|
throw new IOException("Malformed split-index with " +
|
|
|
fields.length + " fields");
|
|
|
}
|
|
|
|
|
|
IndexRecord record = new IndexRecord(
|
|
|
fields[0],
|
|
|
- Long.valueOf(fields[1]).longValue(),
|
|
|
- Long.valueOf(fields[2]).longValue()
|
|
|
+ Long.valueOf(fields[1]).longValue()
|
|
|
);
|
|
|
- LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset +
|
|
|
- ", " + record.splitLength + ">");
|
|
|
+ LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
|
|
|
|
|
|
// Save
|
|
|
records.add(record);
|
|
|
- logFileSize += record.splitLength;
|
|
|
}
|
|
|
|
|
|
indexRecords = new IndexRecord[records.size()];
|
|
|
indexRecords = records.toArray(indexRecords);
|
|
|
+ IndexRecord lastRecord = indexRecords[records.size() - 1];
|
|
|
+ logFileSize = lastRecord.splitOffset
|
|
|
+ + new File(lastRecord.splitName).length();
|
|
|
initialized = true;
|
|
|
LOG.debug("Log size: " + logFileSize);
|
|
|
}
|
|
@@ -384,34 +380,28 @@ class TaskLog {
|
|
|
|
|
|
// Get all splits
|
|
|
Vector<InputStream> streams = new Vector<InputStream>();
|
|
|
- int totalLogSize = 0;
|
|
|
for (int i=0; i < indexRecords.length; ++i) {
|
|
|
InputStream stream = getLogSplit(i);
|
|
|
if (stream != null) {
|
|
|
streams.add(stream);
|
|
|
- totalLogSize += indexRecords[i].splitLength;
|
|
|
LOG.debug("Added split: " + i);
|
|
|
}
|
|
|
}
|
|
|
- LOG.debug("Total log-size on disk: " + totalLogSize +
|
|
|
- "; actual log-size: " + logFileSize);
|
|
|
+ LOG.debug("Total log-size on disk: " + logFileSize);
|
|
|
|
|
|
// Copy log data into buffer
|
|
|
- byte[] b = new byte[totalLogSize];
|
|
|
+ byte[] b = new byte[(int) logFileSize];
|
|
|
SequenceInputStream in = new SequenceInputStream(streams.elements());
|
|
|
try {
|
|
|
- int bytesRead = 0, totalBytesRead = 0;
|
|
|
- int off = 0, len = totalLogSize;
|
|
|
- LOG.debug("Attempting to read " + len + " bytes from logs");
|
|
|
- while ((bytesRead = in.read(b, off, len)) > 0) {
|
|
|
+ int bytesRead = 0;
|
|
|
+ int off = 0;
|
|
|
+ LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
|
|
|
+ while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
|
|
|
LOG.debug("Got " + bytesRead + " bytes");
|
|
|
off += bytesRead;
|
|
|
- len -= bytesRead;
|
|
|
-
|
|
|
- totalBytesRead += bytesRead;
|
|
|
}
|
|
|
|
|
|
- if (totalBytesRead != totalLogSize) {
|
|
|
+ if (off != logFileSize) {
|
|
|
LOG.debug("Didn't not read all requisite data in logs!");
|
|
|
}
|
|
|
} finally {
|