|
@@ -26,9 +26,11 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.RandomAccessFile;
|
|
|
import java.util.ArrayList;
|
|
|
import java.lang.Math;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.DFSFileInfo;
|
|
|
*
|
|
|
*/
|
|
|
public class FSEditLog {
|
|
|
+ private static final byte OP_INVALID = -1;
|
|
|
private static final byte OP_ADD = 0;
|
|
|
private static final byte OP_RENAME = 1; // rename
|
|
|
private static final byte OP_DELETE = 2; // delete
|
|
@@ -105,14 +108,17 @@ public class FSEditLog {
|
|
|
private FileChannel fc; // channel of the file stream for sync
|
|
|
private DataOutputBuffer bufCurrent; // current buffer for writing
|
|
|
private DataOutputBuffer bufReady; // buffer ready for flushing
|
|
|
+ static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
|
|
|
|
|
|
EditLogFileOutputStream(File name) throws IOException {
|
|
|
super();
|
|
|
file = name;
|
|
|
bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
|
|
|
bufReady = new DataOutputBuffer(sizeFlushBuffer);
|
|
|
- fp = new FileOutputStream(name, true); // open for append
|
|
|
- fc = fp.getChannel();
|
|
|
+ RandomAccessFile rp = new RandomAccessFile(name, "rw");
|
|
|
+ fp = new FileOutputStream(rp.getFD()); // open for append
|
|
|
+ fc = rp.getChannel();
|
|
|
+ fc.position(fc.size());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -141,6 +147,7 @@ public class FSEditLog {
|
|
|
@Override
|
|
|
void create() throws IOException {
|
|
|
fc.truncate(0);
|
|
|
+ fc.position(0);
|
|
|
bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
|
|
|
setReadyToFlush();
|
|
|
flush();
|
|
@@ -158,7 +165,11 @@ public class FSEditLog {
|
|
|
}
|
|
|
bufCurrent.close();
|
|
|
bufReady.close();
|
|
|
+
|
|
|
+ // remove the last INVALID marker from transaction log.
|
|
|
+ fc.truncate(fc.position());
|
|
|
fp.close();
|
|
|
+
|
|
|
bufCurrent = bufReady = null;
|
|
|
}
|
|
|
|
|
@@ -167,8 +178,9 @@ public class FSEditLog {
|
|
|
* New data can be still written to the stream while flushing is performed.
|
|
|
*/
|
|
|
@Override
|
|
|
- void setReadyToFlush() {
|
|
|
+ void setReadyToFlush() throws IOException {
|
|
|
assert bufReady.size() == 0 : "previous data is not flushed yet";
|
|
|
+ write(OP_INVALID); // insert end-of-file marker
|
|
|
DataOutputBuffer tmp = bufReady;
|
|
|
bufReady = bufCurrent;
|
|
|
bufCurrent = tmp;
|
|
@@ -181,9 +193,11 @@ public class FSEditLog {
|
|
|
*/
|
|
|
@Override
|
|
|
protected void flushAndSync() throws IOException {
|
|
|
+ preallocate(); // preallocate file if necessary
|
|
|
bufReady.writeTo(fp); // write data to file
|
|
|
bufReady.reset(); // erase all data in the buffer
|
|
|
- fc.force(true); // sync to persistent store
|
|
|
+ fc.force(false); // metadata updates not needed because of preallocation
|
|
|
+ fc.position(fc.position()-1); // skip back the end-of-file marker
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -202,6 +216,21 @@ public class FSEditLog {
|
|
|
long lastModified() {
|
|
|
return file.lastModified();
|
|
|
}
|
|
|
+
|
|
|
+ // allocate a big chunk of data
|
|
|
+ private void preallocate() throws IOException {
|
|
|
+ long position = fc.position();
|
|
|
+ if (position + 4096 >= fc.size()) {
|
|
|
+ FSNamesystem.LOG.info("XXX Preallocating Edit log, current size " +
|
|
|
+ fc.size());
|
|
|
+ long newsize = position + 1024*1024; // 1MB
|
|
|
+ fill.position(0);
|
|
|
+ int written = fc.write(fill, newsize);
|
|
|
+ FSNamesystem.LOG.info("XXX Edit log size is now " + fc.size() +
|
|
|
+ " written " + written + " bytes " +
|
|
|
+ " at offset " + newsize);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class EditLogFileInputStream extends EditLogInputStream {
|
|
@@ -454,6 +483,11 @@ public class FSEditLog {
|
|
|
byte opcode = -1;
|
|
|
try {
|
|
|
opcode = in.readByte();
|
|
|
+ if (opcode == OP_INVALID) {
|
|
|
+ FSNamesystem.LOG.info("Invalid opcode, reached end of edit log " +
|
|
|
+ "Number of transactions found " + numEdits);
|
|
|
+ break; // no more transactions
|
|
|
+ }
|
|
|
} catch (EOFException e) {
|
|
|
break; // no more transactions
|
|
|
}
|