|
@@ -53,25 +53,25 @@ import org.slf4j.LoggerFactory;
|
|
|
* <blockquote><pre>
|
|
|
* LogFile:
|
|
|
* FileHeader TxnList ZeroPad
|
|
|
- *
|
|
|
+ *
|
|
|
* FileHeader: {
|
|
|
* magic 4bytes (ZKLG)
|
|
|
* version 4bytes
|
|
|
* dbid 8bytes
|
|
|
* }
|
|
|
- *
|
|
|
+ *
|
|
|
* TxnList:
|
|
|
* Txn || Txn TxnList
|
|
|
- *
|
|
|
+ *
|
|
|
* Txn:
|
|
|
* checksum Txnlen TxnHeader Record 0x42
|
|
|
- *
|
|
|
+ *
|
|
|
* checksum: 8bytes Adler32 is currently used
|
|
|
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
|
|
|
- *
|
|
|
+ *
|
|
|
* Txnlen:
|
|
|
* len 4bytes
|
|
|
- *
|
|
|
+ *
|
|
|
* TxnHeader: {
|
|
|
* sessionid 8bytes
|
|
|
* cxid 4bytes
|
|
@@ -79,13 +79,13 @@ import org.slf4j.LoggerFactory;
|
|
|
* time 8bytes
|
|
|
* type 4bytes
|
|
|
* }
|
|
|
- *
|
|
|
+ *
|
|
|
* Record:
|
|
|
* See Jute definition file for details on the various record types
|
|
|
- *
|
|
|
+ *
|
|
|
* ZeroPad:
|
|
|
* 0 padded to EOF (filled during preallocation stage)
|
|
|
- * </pre></blockquote>
|
|
|
+ * </pre></blockquote>
|
|
|
*/
|
|
|
public class FileTxnLog implements TxnLog {
|
|
|
private static final Logger LOG;
|
|
@@ -175,7 +175,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
/**
|
|
|
* close all the open file handles
|
|
|
* @throws IOException
|
|
|
- */
|
|
|
+ */
|
|
|
public synchronized void close() throws IOException {
|
|
|
if (logStream != null) {
|
|
|
logStream.close();
|
|
@@ -184,54 +184,56 @@ public class FileTxnLog implements TxnLog {
|
|
|
log.close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* append an entry to the transaction log
|
|
|
* @param hdr the header of the transaction
|
|
|
* @param txn the transaction part of the entry
|
|
|
- * returns true iff something appended, otw false
|
|
|
+ * returns true iff something appended, otw false
|
|
|
*/
|
|
|
public synchronized boolean append(TxnHeader hdr, Record txn)
|
|
|
throws IOException
|
|
|
{
|
|
|
- if (hdr != null) {
|
|
|
- if (hdr.getZxid() <= lastZxidSeen) {
|
|
|
- LOG.warn("Current zxid " + hdr.getZxid()
|
|
|
- + " is <= " + lastZxidSeen + " for "
|
|
|
- + hdr.getType());
|
|
|
- }
|
|
|
- if (logStream==null) {
|
|
|
- if(LOG.isInfoEnabled()){
|
|
|
- LOG.info("Creating new log file: log." +
|
|
|
- Long.toHexString(hdr.getZxid()));
|
|
|
- }
|
|
|
-
|
|
|
- logFileWrite = new File(logDir, ("log." +
|
|
|
- Long.toHexString(hdr.getZxid())));
|
|
|
- fos = new FileOutputStream(logFileWrite);
|
|
|
- logStream=new BufferedOutputStream(fos);
|
|
|
- oa = BinaryOutputArchive.getArchive(logStream);
|
|
|
- FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
|
|
|
- fhdr.serialize(oa, "fileheader");
|
|
|
- // Make sure that the magic number is written before padding.
|
|
|
- logStream.flush();
|
|
|
- currentSize = fos.getChannel().position();
|
|
|
- streamsToFlush.add(fos);
|
|
|
- }
|
|
|
- padFile(fos);
|
|
|
- byte[] buf = Util.marshallTxnEntry(hdr, txn);
|
|
|
- if (buf == null || buf.length == 0) {
|
|
|
- throw new IOException("Faulty serialization for header " +
|
|
|
- "and txn");
|
|
|
- }
|
|
|
- Checksum crc = makeChecksumAlgorithm();
|
|
|
- crc.update(buf, 0, buf.length);
|
|
|
- oa.writeLong(crc.getValue(), "txnEntryCRC");
|
|
|
- Util.writeTxnBytes(oa, buf);
|
|
|
-
|
|
|
- return true;
|
|
|
+ if (hdr == null) {
|
|
|
+ return false;
|
|
|
}
|
|
|
- return false;
|
|
|
+ if (hdr.getZxid() <= lastZxidSeen) {
|
|
|
+ LOG.warn("Current zxid " + hdr.getZxid()
|
|
|
+ + " is <= " + lastZxidSeen + " for "
|
|
|
+ + hdr.getType());
|
|
|
+ } else {
|
|
|
+ lastZxidSeen = hdr.getZxid();
|
|
|
+ }
|
|
|
+ if (logStream==null) {
|
|
|
+ if(LOG.isInfoEnabled()){
|
|
|
+ LOG.info("Creating new log file: log." +
|
|
|
+ Long.toHexString(hdr.getZxid()));
|
|
|
+ }
|
|
|
+
|
|
|
+ logFileWrite = new File(logDir, ("log." +
|
|
|
+ Long.toHexString(hdr.getZxid())));
|
|
|
+ fos = new FileOutputStream(logFileWrite);
|
|
|
+ logStream=new BufferedOutputStream(fos);
|
|
|
+ oa = BinaryOutputArchive.getArchive(logStream);
|
|
|
+ FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
|
|
|
+ fhdr.serialize(oa, "fileheader");
|
|
|
+ // Make sure that the magic number is written before padding.
|
|
|
+ logStream.flush();
|
|
|
+ currentSize = fos.getChannel().position();
|
|
|
+ streamsToFlush.add(fos);
|
|
|
+ }
|
|
|
+ padFile(fos);
|
|
|
+ byte[] buf = Util.marshallTxnEntry(hdr, txn);
|
|
|
+ if (buf == null || buf.length == 0) {
|
|
|
+ throw new IOException("Faulty serialization for header " +
|
|
|
+ "and txn");
|
|
|
+ }
|
|
|
+ Checksum crc = makeChecksumAlgorithm();
|
|
|
+ crc.update(buf, 0, buf.length);
|
|
|
+ oa.writeLong(crc.getValue(), "txnEntryCRC");
|
|
|
+ Util.writeTxnBytes(oa, buf);
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -456,10 +458,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * a class that keeps track of the position
|
|
|
+ * a class that keeps track of the position
|
|
|
* in the input stream. The position points to offset
|
|
|
- * that has been consumed by the applications. It can
|
|
|
- * wrap buffered input streams to provide the right offset
|
|
|
+ * that has been consumed by the applications. It can
|
|
|
+ * wrap buffered input streams to provide the right offset
|
|
|
* for the application.
|
|
|
*/
|
|
|
static class PositionInputStream extends FilterInputStream {
|
|
@@ -468,7 +470,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
super(in);
|
|
|
position = 0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public int read() throws IOException {
|
|
|
int rc = super.read();
|
|
@@ -483,9 +485,9 @@ public class FileTxnLog implements TxnLog {
|
|
|
if (rc > 0) {
|
|
|
position += rc;
|
|
|
}
|
|
|
- return rc;
|
|
|
+ return rc;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public int read(byte[] b, int off, int len) throws IOException {
|
|
|
int rc = super.read(b, off, len);
|
|
@@ -494,7 +496,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
return rc;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public long skip(long n) throws IOException {
|
|
|
long rc = super.skip(n);
|
|
@@ -522,7 +524,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
throw new UnsupportedOperationException("reset");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* this class implements the txnlog iterator interface
|
|
|
* which is used for reading the transaction logs
|
|
@@ -535,7 +537,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
File logFile;
|
|
|
InputArchive ia;
|
|
|
static final String CRC_ERROR="CRC check failed";
|
|
|
-
|
|
|
+
|
|
|
PositionInputStream inputStream=null;
|
|
|
//stored files is the list of files greater than
|
|
|
//the zxid we are looking for.
|
|
@@ -564,7 +566,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* create an iterator over a transaction database directory
|
|
|
* @param logDir the transaction database directory
|
|
@@ -596,7 +598,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
goToNextLog();
|
|
|
next();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Return total storage size of txnlog that will return by this iterator.
|
|
|
*/
|
|
@@ -634,7 +636,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
FileHeader header= new FileHeader();
|
|
|
header.deserialize(ia, "fileheader");
|
|
|
if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
|
|
|
- throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
|
|
|
+ throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
|
|
|
+ header.getMagic()
|
|
|
+ " != " + FileTxnLog.TXNLOG_MAGIC);
|
|
|
}
|