|
@@ -49,44 +49,25 @@ import org.apache.zookeeper.txn.TxnHeader;
|
|
|
*
|
|
|
*/
|
|
|
public class FileTxnLog implements TxnLog {
|
|
|
- private static final Logger LOG;
|
|
|
-
|
|
|
- static long preAllocSize = 65536 * 1024;
|
|
|
-
|
|
|
- public final static int TXNLOG_MAGIC =
|
|
|
- ByteBuffer.wrap("ZKLG".getBytes()).getInt();
|
|
|
-
|
|
|
- public final static int VERSION = 2;
|
|
|
-
|
|
|
- static {
|
|
|
- LOG = Logger.getLogger(FileTxnLog.class);
|
|
|
-
|
|
|
- forceSync =
|
|
|
- !System.getProperty("zookeeper.forceSync", "yes").equals("no");
|
|
|
-
|
|
|
- String size = System.getProperty("zookeeper.preAllocSize");
|
|
|
- if (size != null) {
|
|
|
- try {
|
|
|
- preAllocSize = Long.parseLong(size) * 1024;
|
|
|
- } catch (NumberFormatException e) {
|
|
|
- LOG.warn(size + " is not a valid value for preAllocSize");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
long lastZxidSeen;
|
|
|
volatile BufferedOutputStream logStream = null;
|
|
|
volatile OutputArchive oa;
|
|
|
volatile FileOutputStream fos = null;
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
File logDir;
|
|
|
- private static boolean forceSync = true;
|
|
|
+ public final static int TXNLOG_MAGIC =
|
|
|
+ ByteBuffer.wrap("ZKLG".getBytes()).getInt();
|
|
|
+ public final static int VERSION = 2;
|
|
|
+ private boolean forceSync = true;
|
|
|
long dbId;
|
|
|
- private LinkedList<FileOutputStream> streamsToFlush =
|
|
|
- new LinkedList<FileOutputStream>();
|
|
|
+ private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
|
|
|
+ static long preAllocSize = 65536 * 1024;
|
|
|
long currentSize;
|
|
|
File logFileWrite = null;
|
|
|
-
|
|
|
+
|
|
|
+ private static final Logger LOG = Logger.getLogger(FileTxnLog.class);
|
|
|
+
|
|
|
/**
|
|
|
* constructor for FileTxnLog. Take the directory
|
|
|
* where the txnlogs are stored
|
|
@@ -94,17 +75,27 @@ public class FileTxnLog implements TxnLog {
|
|
|
*/
|
|
|
public FileTxnLog(File logDir) {
|
|
|
this.logDir = logDir;
|
|
|
+ forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
|
|
|
+ "no");
|
|
|
+ String size = System.getProperty("zookeeper.preAllocSize");
|
|
|
+ if (size != null) {
|
|
|
+ try {
|
|
|
+ preAllocSize = Long.parseLong(size) * 1024;
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ LOG.warn(size + " is not a valid value for preAllocSize");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* method to allow setting preallocate size
|
|
|
* of log file to pad the file.
|
|
|
- * @param size the size to set to in bytes
|
|
|
+ * @param size the size to set to
|
|
|
*/
|
|
|
public static void setPreallocSize(long size) {
|
|
|
preAllocSize = size;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* creates a checksum alogrithm to be used
|
|
|
* @return the checksum used for this txnlog
|
|
@@ -116,7 +107,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
|
|
|
/**
|
|
|
* rollover the current log file to a new one.
|
|
|
- * @throws IOException
|
|
|
+ * @throws IOException
|
|
|
*/
|
|
|
public void rollLog() throws IOException {
|
|
|
if (logStream != null) {
|
|
@@ -131,7 +122,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
* @param hdr the header of the transaction
|
|
|
* @param txn the transaction part of the entry
|
|
|
*/
|
|
|
- public synchronized void append(TxnHeader hdr, Record txn)
|
|
|
+ public synchronized void append(TxnHeader hdr, Record txn)
|
|
|
throws IOException {
|
|
|
if (hdr != null) {
|
|
|
if (hdr.getZxid() <= lastZxidSeen) {
|
|
@@ -140,7 +131,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
+ hdr.getType());
|
|
|
}
|
|
|
if (logStream==null) {
|
|
|
- logFileWrite = new File(logDir, ("log." +
|
|
|
+ logFileWrite = new File(logDir, ("log." +
|
|
|
Long.toHexString(hdr.getZxid())));
|
|
|
fos = new FileOutputStream(logFileWrite);
|
|
|
logStream=new BufferedOutputStream(fos);
|
|
@@ -154,15 +145,15 @@ public class FileTxnLog implements TxnLog {
|
|
|
byte[] buf = Util.marshallTxnEntry(hdr, txn);
|
|
|
if (buf == null || buf.length == 0) {
|
|
|
throw new IOException("Faulty serialization for header " +
|
|
|
- "and txn");
|
|
|
+ "and txn");
|
|
|
}
|
|
|
Checksum crc = makeChecksumAlgorithm();
|
|
|
crc.update(buf, 0, buf.length);
|
|
|
oa.writeLong(crc.getValue(), "txnEntryCRC");
|
|
|
Util.writeTxnBytes(oa, buf);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* pad the current file to increase its size
|
|
|
* @param out the outputstream to be padded
|
|
@@ -171,7 +162,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
private void padFile(FileOutputStream out) throws IOException {
|
|
|
currentSize = Util.padLogFile(out, currentSize, preAllocSize);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Find the log file that starts at, or just before, the snapshot. Return
|
|
|
* this and all subsequent logs. Results are ordered by zxid of file,
|
|
@@ -190,7 +181,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
if (fzxid > snapshotZxid) {
|
|
|
continue;
|
|
|
}
|
|
|
- // the files
|
|
|
+ // the files
|
|
|
// are sorted with zxid's
|
|
|
if (fzxid > logZxid) {
|
|
|
logZxid = fzxid;
|
|
@@ -205,9 +196,9 @@ public class FileTxnLog implements TxnLog {
|
|
|
v.add(f);
|
|
|
}
|
|
|
return v.toArray(new File[0]);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* get the last zxid that was logged in the transaction logs
|
|
|
* @return the last zxid logged in the transaction logs
|
|
@@ -216,10 +207,11 @@ public class FileTxnLog implements TxnLog {
|
|
|
File[] files = getLogFiles(logDir.listFiles(), 0);
|
|
|
long maxLog=files.length>0?
|
|
|
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
|
|
|
-
|
|
|
- // if a log file is more recent we must scan it to find
|
|
|
+
|
|
|
+ // if a log file is more recent we must scan it to find
|
|
|
// the highest zxid
|
|
|
long zxid = maxLog;
|
|
|
+ FileOutputStream logStream = null;
|
|
|
try {
|
|
|
FileTxnLog txn = new FileTxnLog(logDir);
|
|
|
TxnIterator itr = txn.read(maxLog);
|
|
@@ -231,12 +223,17 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Unexpected exception", e);
|
|
|
+ } finally {
|
|
|
+ if (logStream != null)
|
|
|
+ try {
|
|
|
+ logStream.close();
|
|
|
+ } catch(IOException io){}
|
|
|
}
|
|
|
return zxid;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * commit the logs. make sure that evertyhing hits the
|
|
|
+ * commit the logs. make sure that evertyhing hits the
|
|
|
* disk
|
|
|
*/
|
|
|
public synchronized void commit() throws IOException {
|
|
@@ -253,7 +250,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
streamsToFlush.removeFirst().close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* start reading all the transactions from the given zxid
|
|
|
* @param zxid the zxid to start reading transactions from
|
|
@@ -262,8 +259,8 @@ public class FileTxnLog implements TxnLog {
|
|
|
*/
|
|
|
public TxnIterator read(long zxid) throws IOException {
|
|
|
return new FileTxnIterator(logDir, zxid);
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* truncate the current transaction logs
|
|
|
* @param zxid the zxid to truncate the logs to
|
|
@@ -278,13 +275,11 @@ public class FileTxnLog implements TxnLog {
|
|
|
raf.setLength(pos);
|
|
|
raf.close();
|
|
|
while(itr.goToNextLog()) {
|
|
|
- if (!itr.logFile.delete()) {
|
|
|
- LOG.warn("Unable to truncate " + itr.logFile);
|
|
|
- }
|
|
|
+ itr.logFile.delete();
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* read the header of the transaction file
|
|
|
* @param file the transaction file to read
|
|
@@ -293,21 +288,20 @@ public class FileTxnLog implements TxnLog {
|
|
|
*/
|
|
|
private static FileHeader readHeader(File file) throws IOException {
|
|
|
InputStream is =null;
|
|
|
- try {
|
|
|
+ try{
|
|
|
is = new BufferedInputStream(new FileInputStream(file));
|
|
|
InputArchive ia=BinaryInputArchive.getArchive(is);
|
|
|
FileHeader hdr = new FileHeader();
|
|
|
hdr.deserialize(ia, "fileheader");
|
|
|
return hdr;
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- if (is != null) is.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Ignoring exception during close", e);
|
|
|
+ }finally{
|
|
|
+ try{
|
|
|
+ if(is != null) is.close();
|
|
|
+ }catch(IOException e){
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* the dbid of this transaction database
|
|
|
* @return the dbid of this database
|
|
@@ -320,10 +314,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
throw new IOException("Unsupported Format.");
|
|
|
return fh.getDbid();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * this class implements the txnlog iterator interface
|
|
|
- * which is used for reading the transaction logs
|
|
|
+ * this class implements the txnlog iterator interface
|
|
|
+ * which is used for reading the transaction logs
|
|
|
*/
|
|
|
public static class FileTxnIterator implements TxnLog.TxnIterator {
|
|
|
File logDir;
|
|
@@ -334,10 +328,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
InputArchive ia;
|
|
|
static final String CRC_ERROR="CRC check failed";
|
|
|
FileInputStream inputStream=null;
|
|
|
- //stored files is the list of files greater than
|
|
|
+ //stored files is the list of files greater than
|
|
|
//the zxid we are looking for.
|
|
|
private ArrayList<File> storedFiles;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* create an iterator over a transaction database directory
|
|
|
* @param logDir the transaction database directory
|
|
@@ -349,10 +343,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
this.zxid = zxid;
|
|
|
init();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* initialize to the zxid specified
|
|
|
- * this is inclusive of the zxid
|
|
|
+ * this is inclusive of the zxid
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
void init() throws IOException {
|
|
@@ -376,10 +370,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * go to the next logfile
|
|
|
- * @return true if there is one and false if there is no
|
|
|
+ * go to the next logfile
|
|
|
+ * @return true if there is one and false if there is no
|
|
|
* new file to be read
|
|
|
* @throws IOException
|
|
|
*/
|
|
@@ -391,23 +385,23 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* read the header fomr the inputarchive
|
|
|
* @param ia the inputarchive to be read from
|
|
|
- * @param is the inputstream
|
|
|
+ * @param is the inputstream
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- protected void inStreamCreated(InputArchive ia, FileInputStream is)
|
|
|
+ protected void inStreamCreated(InputArchive ia, FileInputStream is)
|
|
|
throws IOException{
|
|
|
FileHeader header= new FileHeader();
|
|
|
header.deserialize(ia, "fileheader");
|
|
|
if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
|
|
|
- throw new IOException("Invalid magic number " + header.getMagic()
|
|
|
+ throw new IOException("Invalid magic number " + header.getMagic()
|
|
|
+ " != " + FileTxnLog.TXNLOG_MAGIC);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Invoked to indicate that the input stream has been created.
|
|
|
* @param ia input archive
|
|
@@ -424,15 +418,15 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
return ia;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * create a checksum algorithm
|
|
|
+ * create a checksum algorithm
|
|
|
* @return the checksum algorithm
|
|
|
*/
|
|
|
protected Checksum makeChecksumAlgorithm(){
|
|
|
return new Adler32();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* the iterator that moves to the next transaction
|
|
|
* @return true if there is more transactions to be read
|
|
@@ -447,12 +441,12 @@ public class FileTxnLog implements TxnLog {
|
|
|
byte[] bytes = Util.readTxnBytes(ia);
|
|
|
// Since we preallocate, we define EOF to be an
|
|
|
if (bytes == null || bytes.length==0)
|
|
|
- throw new EOFException("Failed to read");
|
|
|
+ throw new EOFException("Failed to read");
|
|
|
// EOF or corrupted record
|
|
|
// validate CRC
|
|
|
Checksum crc = makeChecksumAlgorithm();
|
|
|
crc.update(bytes, 0, bytes.length);
|
|
|
- if (crcValue != crc.getValue())
|
|
|
+ if (crcValue != crc.getValue())
|
|
|
throw new IOException(CRC_ERROR);
|
|
|
if (bytes == null || bytes.length == 0)
|
|
|
return false;
|
|
@@ -465,7 +459,7 @@ public class FileTxnLog implements TxnLog {
|
|
|
inputStream.close();
|
|
|
inputStream = null;
|
|
|
ia = null;
|
|
|
- // thsi means that the file has ended
|
|
|
+ // thsi means that the file has ended
|
|
|
// we shoud go to the next file
|
|
|
if (!goToNextLog()) {
|
|
|
return false;
|
|
@@ -473,10 +467,10 @@ public class FileTxnLog implements TxnLog {
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * reutrn the current header
|
|
|
- * @return the current header that
|
|
|
+ * reutrn the current header
|
|
|
+ * @return the current header that
|
|
|
* is read
|
|
|
*/
|
|
|
public TxnHeader getHeader() {
|
|
@@ -491,9 +485,9 @@ public class FileTxnLog implements TxnLog {
|
|
|
public Record getTxn() {
|
|
|
return record;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * close the iterator
|
|
|
+ * close the iterator
|
|
|
* and release the resources.
|
|
|
*/
|
|
|
public void close() throws IOException {
|