|
@@ -21,8 +21,6 @@ import java.io.BufferedInputStream;
|
|
|
import java.io.DataInput;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutput;
|
|
|
-import java.io.DataOutputStream;
|
|
|
-import java.io.ByteArrayOutputStream;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
@@ -97,123 +95,148 @@ public class FSEditLog {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- static class EditLogOutputStream {
|
|
|
- private FileChannel fc;
|
|
|
- private FileOutputStream fp;
|
|
|
- private DataOutputStream od;
|
|
|
- private DataOutputStream od1;
|
|
|
- private DataOutputStream od2;
|
|
|
- private ByteArrayOutputStream buf1;
|
|
|
- private ByteArrayOutputStream buf2;
|
|
|
- private int bufSize;
|
|
|
-
|
|
|
- // these are statistics counters
|
|
|
- private long numSync; // number of syncs to disk
|
|
|
- private long totalTimeSync; // total time to sync
|
|
|
-
|
|
|
- EditLogOutputStream(File name) throws IOException {
|
|
|
- bufSize = sizeFlushBuffer;
|
|
|
- buf1 = new ByteArrayOutputStream(bufSize);
|
|
|
- buf2 = new ByteArrayOutputStream(bufSize);
|
|
|
- od1 = new DataOutputStream(buf1);
|
|
|
- od2 = new DataOutputStream(buf2);
|
|
|
- od = od1; // start with first buffer
|
|
|
+ /**
|
|
|
+ * An implementation of the abstract class {@link EditLogOutputStream},
|
|
|
+ * which stores edits in a local file.
|
|
|
+ */
|
|
|
+ static private class EditLogFileOutputStream extends EditLogOutputStream {
|
|
|
+ private File file;
|
|
|
+ private FileOutputStream fp; // file stream for storing edit logs
|
|
|
+ private FileChannel fc; // channel of the file stream for sync
|
|
|
+ private DataOutputBuffer bufCurrent; // current buffer for writing
|
|
|
+ private DataOutputBuffer bufReady; // buffer ready for flushing
|
|
|
+
|
|
|
+ EditLogFileOutputStream(File name) throws IOException {
|
|
|
+ super();
|
|
|
+ file = name;
|
|
|
+ bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
|
|
|
+ bufReady = new DataOutputBuffer(sizeFlushBuffer);
|
|
|
fp = new FileOutputStream(name, true); // open for append
|
|
|
fc = fp.getChannel();
|
|
|
- numSync = totalTimeSync = 0;
|
|
|
}
|
|
|
|
|
|
- // returns the current output stream
|
|
|
- DataOutputStream getOutputStream() {
|
|
|
- return od;
|
|
|
+ @Override
|
|
|
+ String getName() {
|
|
|
+ return file.getPath();
|
|
|
}
|
|
|
|
|
|
- void flushAndSync() throws IOException {
|
|
|
- this.flush();
|
|
|
- fc.force(true);
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ @Override
|
|
|
+ public void write(int b) throws IOException {
|
|
|
+ bufCurrent.write(b);
|
|
|
}
|
|
|
|
|
|
- void create() throws IOException {
|
|
|
- fc.truncate(0);
|
|
|
- od.writeInt(FSConstants.LAYOUT_VERSION);
|
|
|
- flushAndSync();
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ @Override
|
|
|
+ void write(byte op, Writable ... writables) throws IOException {
|
|
|
+ write(op);
|
|
|
+ for(Writable w : writables) {
|
|
|
+ w.write(bufCurrent);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // flush current buffer
|
|
|
- private void flush() throws IOException {
|
|
|
- ByteArrayOutputStream buf = getBuffer();
|
|
|
- if (buf.size() == 0) {
|
|
|
- return; // no data to flush
|
|
|
- }
|
|
|
- buf.writeTo(fp); // write data to file
|
|
|
- buf.reset(); // erase all data in buf
|
|
|
+ /**
|
|
|
+ * Create empty edits logs file.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ void create() throws IOException {
|
|
|
+ fc.truncate(0);
|
|
|
+ bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
|
|
|
+ setReadyToFlush();
|
|
|
+ flush();
|
|
|
}
|
|
|
|
|
|
- void close() throws IOException {
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
// close should have been called after all pending transactions
|
|
|
// have been flushed & synced.
|
|
|
- if (getBufSize() != 0) {
|
|
|
- throw new IOException("FSEditStream has " + getBufSize() +
|
|
|
+ int bufSize = bufCurrent.size();
|
|
|
+ if (bufSize != 0) {
|
|
|
+ throw new IOException("FSEditStream has " + bufSize +
|
|
|
" bytes still to be flushed and cannot " +
|
|
|
- "closed.");
|
|
|
+ "be closed.");
|
|
|
}
|
|
|
- od.close();
|
|
|
+ bufCurrent.close();
|
|
|
+ bufReady.close();
|
|
|
fp.close();
|
|
|
- buf1 = buf2 = null;
|
|
|
- od = od1 = od2 = null;
|
|
|
+ bufCurrent = bufReady = null;
|
|
|
}
|
|
|
|
|
|
- // returns the amount of data in the buffer
|
|
|
- int getBufSize() {
|
|
|
- return getBuffer().size();
|
|
|
+ /**
|
|
|
+ * All data that has been written to the stream so far will be flushed.
|
|
|
+ * New data can be still written to the stream while flushing is performed.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ void setReadyToFlush() {
|
|
|
+ assert bufReady.size() == 0 : "previous data is not flushed yet";
|
|
|
+ DataOutputBuffer tmp = bufReady;
|
|
|
+ bufReady = bufCurrent;
|
|
|
+ bufCurrent = tmp;
|
|
|
}
|
|
|
|
|
|
- // get the current buffer
|
|
|
- private ByteArrayOutputStream getBuffer() {
|
|
|
- if (od == od1) {
|
|
|
- return buf1;
|
|
|
- } else {
|
|
|
- return buf2;
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Flush ready buffer to persistent store.
|
|
|
+ * currentBuffer is not flushed as it accumulates new log records
|
|
|
+ * while readyBuffer will be flushed and synced.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void flushAndSync() throws IOException {
|
|
|
+ bufReady.writeTo(fp); // write data to file
|
|
|
+ bufReady.reset(); // erase all data in the buffer
|
|
|
+ fc.force(true); // sync to persistent store
|
|
|
}
|
|
|
|
|
|
- //
|
|
|
- // Flush current buffer to output stream, swap buffers
|
|
|
- // This is protected by the flushLock.
|
|
|
- //
|
|
|
- void swap() {
|
|
|
- if (od == od1) {
|
|
|
- od = od2;
|
|
|
- } else {
|
|
|
- od = od1;
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Return the size of the current edit log including buffered data.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ long length() throws IOException {
|
|
|
+ // file size + size of both buffers
|
|
|
+ return fc.size() + bufReady.size() + bufCurrent.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the time the edits log file was last modified.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ long lastModified() {
|
|
|
+ return file.lastModified();
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
- // Flush old buffer to persistent store
|
|
|
- //
|
|
|
- void flushAndSyncOld() throws IOException {
|
|
|
- numSync++;
|
|
|
- ByteArrayOutputStream oldbuf;
|
|
|
- if (od == od1) {
|
|
|
- oldbuf = buf2;
|
|
|
- } else {
|
|
|
- oldbuf = buf1;
|
|
|
- }
|
|
|
- long start = FSNamesystem.now();
|
|
|
- oldbuf.writeTo(fp); // write data to file
|
|
|
- oldbuf.reset(); // erase all data in buf
|
|
|
- fc.force(true); // sync to persistent store
|
|
|
- long end = FSNamesystem.now();
|
|
|
- totalTimeSync += (end - start);
|
|
|
+ static class EditLogFileInputStream extends EditLogInputStream {
|
|
|
+ private File file;
|
|
|
+ private FileInputStream fStream;
|
|
|
+
|
|
|
+ EditLogFileInputStream(File name) throws IOException {
|
|
|
+ file = name;
|
|
|
+ fStream = new FileInputStream(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String getName() {
|
|
|
+ return file.getPath();
|
|
|
}
|
|
|
|
|
|
- long getTotalSyncTime() {
|
|
|
- return totalTimeSync;
|
|
|
+ @Override
|
|
|
+ public int available() throws IOException {
|
|
|
+ return fStream.available();
|
|
|
}
|
|
|
|
|
|
- long getNumSync() {
|
|
|
- return numSync;
|
|
|
+ @Override
|
|
|
+ public int read() throws IOException {
|
|
|
+ return fStream.read();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ fStream.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ long length() throws IOException {
|
|
|
+ // file size + size of both buffers
|
|
|
+ return file.length();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -258,24 +281,24 @@ public class FSEditLog {
|
|
|
for (int idx = 0; idx < size; idx++) {
|
|
|
File eFile = getEditFile(idx);
|
|
|
try {
|
|
|
- EditLogOutputStream eStream = new EditLogOutputStream(eFile);
|
|
|
+ EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
|
|
|
editStreams.add(eStream);
|
|
|
} catch (IOException e) {
|
|
|
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
|
|
|
fsimage.processIOError(idx);
|
|
|
- idx--;
|
|
|
+ idx--;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public synchronized void createEditLogFile(File name) throws IOException {
|
|
|
- EditLogOutputStream eStream = new EditLogOutputStream(name);
|
|
|
+ EditLogOutputStream eStream = new EditLogFileOutputStream(name);
|
|
|
eStream.create();
|
|
|
eStream.close();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Create edits.new if non existant.
|
|
|
+ * Create edits.new if non existent.
|
|
|
*/
|
|
|
synchronized void createNewIfMissing() throws IOException {
|
|
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
@@ -286,7 +309,7 @@ public class FSEditLog {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Shutdown the filestore
|
|
|
+ * Shutdown the file store.
|
|
|
*/
|
|
|
public synchronized void close() throws IOException {
|
|
|
while (isSyncRunning) {
|
|
@@ -304,7 +327,8 @@ public class FSEditLog {
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
try {
|
|
|
- eStream.flushAndSync();
|
|
|
+ eStream.setReadyToFlush();
|
|
|
+ eStream.flush();
|
|
|
eStream.close();
|
|
|
} catch (IOException e) {
|
|
|
processIOError(idx);
|
|
@@ -384,7 +408,7 @@ public class FSEditLog {
|
|
|
* This is where we apply edits that we've been writing to disk all
|
|
|
* along.
|
|
|
*/
|
|
|
- int loadFSEdits(File edits) throws IOException {
|
|
|
+ static int loadFSEdits(EditLogInputStream edits) throws IOException {
|
|
|
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
|
|
|
FSDirectory fsDir = fsNamesys.dir;
|
|
|
int numEdits = 0;
|
|
@@ -399,275 +423,272 @@ public class FSEditLog {
|
|
|
numOpOther = 0;
|
|
|
long startTime = FSNamesystem.now();
|
|
|
|
|
|
- if (edits != null) {
|
|
|
- DataInputStream in = new DataInputStream(new BufferedInputStream(
|
|
|
- new FileInputStream(edits)));
|
|
|
+ DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
|
|
|
+ try {
|
|
|
+ // Read log file version. Could be missing.
|
|
|
+ in.mark(4);
|
|
|
+ // If edits log is greater than 2G, available method will return negative
|
|
|
+ // numbers, so we avoid having to call available
|
|
|
+ boolean available = true;
|
|
|
try {
|
|
|
- // Read log file version. Could be missing.
|
|
|
- in.mark(4);
|
|
|
- // If edits log is greater than 2G, available method will return negative
|
|
|
- // numbers, so we avoid having to call available
|
|
|
- boolean available = true;
|
|
|
+ logVersion = in.readByte();
|
|
|
+ } catch (EOFException e) {
|
|
|
+ available = false;
|
|
|
+ }
|
|
|
+ if (available) {
|
|
|
+ in.reset();
|
|
|
+ logVersion = in.readInt();
|
|
|
+ if (logVersion < FSConstants.LAYOUT_VERSION) // future version
|
|
|
+ throw new IOException(
|
|
|
+ "Unexpected version of the file system log file: "
|
|
|
+ + logVersion + ". Current version = "
|
|
|
+ + FSConstants.LAYOUT_VERSION + ".");
|
|
|
+ }
|
|
|
+ assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
|
|
|
+ "Unsupported version " + logVersion;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ long timestamp = 0;
|
|
|
+ long mtime = 0;
|
|
|
+ long blockSize = 0;
|
|
|
+ byte opcode = -1;
|
|
|
try {
|
|
|
- logVersion = in.readByte();
|
|
|
+ opcode = in.readByte();
|
|
|
} catch (EOFException e) {
|
|
|
- available = false;
|
|
|
- }
|
|
|
- if (available) {
|
|
|
- in.reset();
|
|
|
- logVersion = in.readInt();
|
|
|
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
|
|
|
- throw new IOException(
|
|
|
- "Unexpected version of the file system log file: "
|
|
|
- + logVersion + ". Current version = "
|
|
|
- + FSConstants.LAYOUT_VERSION + ".");
|
|
|
+ break; // no more transactions
|
|
|
}
|
|
|
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
|
|
|
- "Unsupported version " + logVersion;
|
|
|
-
|
|
|
- while (true) {
|
|
|
- long timestamp = 0;
|
|
|
- long mtime = 0;
|
|
|
- long blockSize = 0;
|
|
|
- byte opcode = -1;
|
|
|
- try {
|
|
|
- opcode = in.readByte();
|
|
|
- } catch (EOFException e) {
|
|
|
- break; // no more transactions
|
|
|
+ numEdits++;
|
|
|
+ switch (opcode) {
|
|
|
+ case OP_ADD:
|
|
|
+ case OP_CLOSE: {
|
|
|
+ // versions > 0 support per file replication
|
|
|
+ // get name and replication
|
|
|
+ int length = in.readInt();
|
|
|
+ if (-7 == logVersion && length != 3||
|
|
|
+ logVersion < -7 && length != 4) {
|
|
|
+ throw new IOException("Incorrect data format." +
|
|
|
+ " logVersion is " + logVersion +
|
|
|
+ " but writables.length is " +
|
|
|
+ length + ". ");
|
|
|
}
|
|
|
- numEdits++;
|
|
|
- switch (opcode) {
|
|
|
- case OP_ADD:
|
|
|
- case OP_CLOSE: {
|
|
|
- // versions > 0 support per file replication
|
|
|
- // get name and replication
|
|
|
- int length = in.readInt();
|
|
|
- if (-7 == logVersion && length != 3||
|
|
|
- logVersion < -7 && length != 4) {
|
|
|
- throw new IOException("Incorrect data format." +
|
|
|
- " logVersion is " + logVersion +
|
|
|
- " but writables.length is " +
|
|
|
- length + ". ");
|
|
|
- }
|
|
|
- path = FSImage.readString(in);
|
|
|
- short replication = adjustReplication(readShort(in));
|
|
|
- mtime = readLong(in);
|
|
|
- if (logVersion < -7) {
|
|
|
- blockSize = readLong(in);
|
|
|
- }
|
|
|
- // get blocks
|
|
|
- Block blocks[] = null;
|
|
|
- if (logVersion <= -14) {
|
|
|
- blocks = readBlocks(in);
|
|
|
- } else {
|
|
|
- BlockTwo oldblk = new BlockTwo();
|
|
|
- int num = in.readInt();
|
|
|
- blocks = new Block[num];
|
|
|
- for (int i = 0; i < num; i++) {
|
|
|
- oldblk.readFields(in);
|
|
|
- blocks[i] = new Block(oldblk.blkid, oldblk.len,
|
|
|
- Block.GRANDFATHER_GENERATION_STAMP);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Older versions of HDFS does not store the block size in inode.
|
|
|
- // If the file has more than one block, use the size of the
|
|
|
- // first block as the blocksize. Otherwise use the default
|
|
|
- // block size.
|
|
|
- if (-8 <= logVersion && blockSize == 0) {
|
|
|
- if (blocks.length > 1) {
|
|
|
- blockSize = blocks[0].getNumBytes();
|
|
|
- } else {
|
|
|
- long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
|
|
|
- blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
|
|
|
- if (logVersion <= -11) {
|
|
|
- permissions = PermissionStatus.read(in);
|
|
|
+ path = FSImage.readString(in);
|
|
|
+ short replication = adjustReplication(readShort(in));
|
|
|
+ mtime = readLong(in);
|
|
|
+ if (logVersion < -7) {
|
|
|
+ blockSize = readLong(in);
|
|
|
+ }
|
|
|
+ // get blocks
|
|
|
+ Block blocks[] = null;
|
|
|
+ if (logVersion <= -14) {
|
|
|
+ blocks = readBlocks(in);
|
|
|
+ } else {
|
|
|
+ BlockTwo oldblk = new BlockTwo();
|
|
|
+ int num = in.readInt();
|
|
|
+ blocks = new Block[num];
|
|
|
+ for (int i = 0; i < num; i++) {
|
|
|
+ oldblk.readFields(in);
|
|
|
+ blocks[i] = new Block(oldblk.blkid, oldblk.len,
|
|
|
+ Block.GRANDFATHER_GENERATION_STAMP);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // clientname, clientMachine and block locations of last block.
|
|
|
- if (opcode == OP_ADD && logVersion <= -12) {
|
|
|
- clientName = FSImage.readString(in);
|
|
|
- clientMachine = FSImage.readString(in);
|
|
|
- if (-13 <= logVersion) {
|
|
|
- readDatanodeDescriptorArray(in);
|
|
|
- }
|
|
|
+ // Older versions of HDFS does not store the block size in inode.
|
|
|
+ // If the file has more than one block, use the size of the
|
|
|
+ // first block as the blocksize. Otherwise use the default
|
|
|
+ // block size.
|
|
|
+ if (-8 <= logVersion && blockSize == 0) {
|
|
|
+ if (blocks.length > 1) {
|
|
|
+ blockSize = blocks[0].getNumBytes();
|
|
|
} else {
|
|
|
- clientName = "";
|
|
|
- clientMachine = "";
|
|
|
- }
|
|
|
-
|
|
|
- // The open lease transaction re-creates a file if necessary.
|
|
|
- // Delete the file if it already exists.
|
|
|
- if (FSNamesystem.LOG.isDebugEnabled()) {
|
|
|
- FSNamesystem.LOG.debug(opcode + ": " + path +
|
|
|
- " numblocks : " + blocks.length +
|
|
|
- " clientHolder " + clientName +
|
|
|
- " clientMachine " + clientMachine);
|
|
|
+ long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
|
|
|
+ blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
|
|
|
+ if (logVersion <= -11) {
|
|
|
+ permissions = PermissionStatus.read(in);
|
|
|
+ }
|
|
|
|
|
|
- old = fsDir.unprotectedDelete(path, mtime);
|
|
|
-
|
|
|
- // add to the file tree
|
|
|
- INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
|
|
|
- path, permissions,
|
|
|
- blocks, replication,
|
|
|
- mtime, blockSize);
|
|
|
- if (opcode == OP_ADD) {
|
|
|
- numOpAdd++;
|
|
|
- //
|
|
|
- // Replace current node with a INodeUnderConstruction.
|
|
|
- // Recreate in-memory lease record.
|
|
|
- //
|
|
|
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
|
|
|
- node.getLocalNameBytes(),
|
|
|
- node.getReplication(),
|
|
|
- node.getModificationTime(),
|
|
|
- node.getPreferredBlockSize(),
|
|
|
- node.getBlocks(),
|
|
|
- node.getPermissionStatus(),
|
|
|
- clientName,
|
|
|
- clientMachine,
|
|
|
- null);
|
|
|
- fsDir.replaceNode(path, node, cons);
|
|
|
- fsNamesys.leaseManager.addLease(cons.clientName, path);
|
|
|
- } else if (opcode == OP_CLOSE) {
|
|
|
- //
|
|
|
- // Remove lease if it exists.
|
|
|
- //
|
|
|
- if (old.isUnderConstruction()) {
|
|
|
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
|
|
|
- old;
|
|
|
- fsNamesys.leaseManager.removeLease(cons.clientName, path);
|
|
|
- }
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case OP_SET_REPLICATION: {
|
|
|
- numOpSetRepl++;
|
|
|
- path = FSImage.readString(in);
|
|
|
- short replication = adjustReplication(readShort(in));
|
|
|
- fsDir.unprotectedSetReplication(path, replication, null);
|
|
|
- break;
|
|
|
- }
|
|
|
- case OP_RENAME: {
|
|
|
- numOpRename++;
|
|
|
- int length = in.readInt();
|
|
|
- if (length != 3) {
|
|
|
- throw new IOException("Incorrect data format. "
|
|
|
- + "Mkdir operation.");
|
|
|
+ // clientname, clientMachine and block locations of last block.
|
|
|
+ if (opcode == OP_ADD && logVersion <= -12) {
|
|
|
+ clientName = FSImage.readString(in);
|
|
|
+ clientMachine = FSImage.readString(in);
|
|
|
+ if (-13 <= logVersion) {
|
|
|
+ readDatanodeDescriptorArray(in);
|
|
|
}
|
|
|
- String s = FSImage.readString(in);
|
|
|
- String d = FSImage.readString(in);
|
|
|
- timestamp = readLong(in);
|
|
|
- DFSFileInfo dinfo = fsDir.getFileInfo(d);
|
|
|
- fsDir.unprotectedRenameTo(s, d, timestamp);
|
|
|
- fsNamesys.changeLease(s, d, dinfo);
|
|
|
- break;
|
|
|
+ } else {
|
|
|
+ clientName = "";
|
|
|
+ clientMachine = "";
|
|
|
}
|
|
|
- case OP_DELETE: {
|
|
|
- numOpDelete++;
|
|
|
- int length = in.readInt();
|
|
|
- if (length != 2) {
|
|
|
- throw new IOException("Incorrect data format. "
|
|
|
- + "delete operation.");
|
|
|
- }
|
|
|
- path = FSImage.readString(in);
|
|
|
- timestamp = readLong(in);
|
|
|
- old = fsDir.unprotectedDelete(path, timestamp);
|
|
|
- if (old != null && old.isUnderConstruction()) {
|
|
|
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
|
|
|
- fsNamesys.leaseManager.removeLease(cons.clientName, path);
|
|
|
- }
|
|
|
- break;
|
|
|
+
|
|
|
+ // The open lease transaction re-creates a file if necessary.
|
|
|
+ // Delete the file if it already exists.
|
|
|
+ if (FSNamesystem.LOG.isDebugEnabled()) {
|
|
|
+ FSNamesystem.LOG.debug(opcode + ": " + path +
|
|
|
+ " numblocks : " + blocks.length +
|
|
|
+ " clientHolder " + clientName +
|
|
|
+ " clientMachine " + clientMachine);
|
|
|
}
|
|
|
- case OP_MKDIR: {
|
|
|
- numOpMkDir++;
|
|
|
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
|
|
|
- int length = in.readInt();
|
|
|
- if (length != 2) {
|
|
|
- throw new IOException("Incorrect data format. "
|
|
|
- + "Mkdir operation.");
|
|
|
- }
|
|
|
- path = FSImage.readString(in);
|
|
|
- timestamp = readLong(in);
|
|
|
|
|
|
- if (logVersion <= -11) {
|
|
|
- permissions = PermissionStatus.read(in);
|
|
|
+ old = fsDir.unprotectedDelete(path, mtime);
|
|
|
+
|
|
|
+ // add to the file tree
|
|
|
+ INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
|
|
|
+ path, permissions,
|
|
|
+ blocks, replication,
|
|
|
+ mtime, blockSize);
|
|
|
+ if (opcode == OP_ADD) {
|
|
|
+ numOpAdd++;
|
|
|
+ //
|
|
|
+ // Replace current node with a INodeUnderConstruction.
|
|
|
+ // Recreate in-memory lease record.
|
|
|
+ //
|
|
|
+ INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
|
|
|
+ node.getLocalNameBytes(),
|
|
|
+ node.getReplication(),
|
|
|
+ node.getModificationTime(),
|
|
|
+ node.getPreferredBlockSize(),
|
|
|
+ node.getBlocks(),
|
|
|
+ node.getPermissionStatus(),
|
|
|
+ clientName,
|
|
|
+ clientMachine,
|
|
|
+ null);
|
|
|
+ fsDir.replaceNode(path, node, cons);
|
|
|
+ fsNamesys.leaseManager.addLease(cons.clientName, path);
|
|
|
+ } else if (opcode == OP_CLOSE) {
|
|
|
+ //
|
|
|
+ // Remove lease if it exists.
|
|
|
+ //
|
|
|
+ if (old.isUnderConstruction()) {
|
|
|
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
|
|
|
+ old;
|
|
|
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
|
|
|
}
|
|
|
- fsDir.unprotectedMkdir(path, permissions, timestamp);
|
|
|
- break;
|
|
|
- }
|
|
|
- case OP_SET_GENSTAMP: {
|
|
|
- numOpSetGenStamp++;
|
|
|
- long lw = in.readLong();
|
|
|
- fsDir.namesystem.setGenerationStamp(lw);
|
|
|
- break;
|
|
|
- }
|
|
|
- case OP_DATANODE_ADD: {
|
|
|
- numOpOther++;
|
|
|
- FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
|
|
|
- nodeimage.readFields(in);
|
|
|
- //Datnodes are not persistent any more.
|
|
|
- break;
|
|
|
}
|
|
|
- case OP_DATANODE_REMOVE: {
|
|
|
- numOpOther++;
|
|
|
- DatanodeID nodeID = new DatanodeID();
|
|
|
- nodeID.readFields(in);
|
|
|
- //Datanodes are not persistent any more.
|
|
|
- break;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_SET_REPLICATION: {
|
|
|
+ numOpSetRepl++;
|
|
|
+ path = FSImage.readString(in);
|
|
|
+ short replication = adjustReplication(readShort(in));
|
|
|
+ fsDir.unprotectedSetReplication(path, replication, null);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_RENAME: {
|
|
|
+ numOpRename++;
|
|
|
+ int length = in.readInt();
|
|
|
+ if (length != 3) {
|
|
|
+ throw new IOException("Incorrect data format. "
|
|
|
+ + "Mkdir operation.");
|
|
|
}
|
|
|
- case OP_SET_PERMISSIONS: {
|
|
|
- numOpSetPerm++;
|
|
|
- if (logVersion > -11)
|
|
|
- throw new IOException("Unexpected opcode " + opcode
|
|
|
- + " for version " + logVersion);
|
|
|
- fsDir.unprotectedSetPermission(
|
|
|
- FSImage.readString(in), FsPermission.read(in));
|
|
|
- break;
|
|
|
+ String s = FSImage.readString(in);
|
|
|
+ String d = FSImage.readString(in);
|
|
|
+ timestamp = readLong(in);
|
|
|
+ DFSFileInfo dinfo = fsDir.getFileInfo(d);
|
|
|
+ fsDir.unprotectedRenameTo(s, d, timestamp);
|
|
|
+ fsNamesys.changeLease(s, d, dinfo);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_DELETE: {
|
|
|
+ numOpDelete++;
|
|
|
+ int length = in.readInt();
|
|
|
+ if (length != 2) {
|
|
|
+ throw new IOException("Incorrect data format. "
|
|
|
+ + "delete operation.");
|
|
|
}
|
|
|
- case OP_SET_OWNER: {
|
|
|
- numOpSetOwner++;
|
|
|
- if (logVersion > -11)
|
|
|
- throw new IOException("Unexpected opcode " + opcode
|
|
|
- + " for version " + logVersion);
|
|
|
- fsDir.unprotectedSetOwner(FSImage.readString(in),
|
|
|
- FSImage.readString(in), FSImage.readString(in));
|
|
|
- break;
|
|
|
+ path = FSImage.readString(in);
|
|
|
+ timestamp = readLong(in);
|
|
|
+ old = fsDir.unprotectedDelete(path, timestamp);
|
|
|
+ if (old != null && old.isUnderConstruction()) {
|
|
|
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
|
|
|
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
|
|
|
}
|
|
|
- case OP_SET_QUOTA: {
|
|
|
- if (logVersion > -16) {
|
|
|
- throw new IOException("Unexpected opcode " + opcode
|
|
|
- + " for version " + logVersion);
|
|
|
- }
|
|
|
- fsDir.unprotectedSetQuota(FSImage.readString(in),
|
|
|
- readLongWritable(in) );
|
|
|
- break;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_MKDIR: {
|
|
|
+ numOpMkDir++;
|
|
|
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
|
|
|
+ int length = in.readInt();
|
|
|
+ if (length != 2) {
|
|
|
+ throw new IOException("Incorrect data format. "
|
|
|
+ + "Mkdir operation.");
|
|
|
}
|
|
|
- case OP_CLEAR_QUOTA: {
|
|
|
- if (logVersion > -16) {
|
|
|
- throw new IOException("Unexpected opcode " + opcode
|
|
|
- + " for version " + logVersion);
|
|
|
- }
|
|
|
- fsDir.unprotectedClearQuota(FSImage.readString(in));
|
|
|
- break;
|
|
|
+ path = FSImage.readString(in);
|
|
|
+ timestamp = readLong(in);
|
|
|
+
|
|
|
+ if (logVersion <= -11) {
|
|
|
+ permissions = PermissionStatus.read(in);
|
|
|
}
|
|
|
- default: {
|
|
|
- throw new IOException("Never seen opcode " + opcode);
|
|
|
+ fsDir.unprotectedMkdir(path, permissions, timestamp);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_SET_GENSTAMP: {
|
|
|
+ numOpSetGenStamp++;
|
|
|
+ long lw = in.readLong();
|
|
|
+ fsDir.namesystem.setGenerationStamp(lw);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_DATANODE_ADD: {
|
|
|
+ numOpOther++;
|
|
|
+ FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
|
|
|
+ nodeimage.readFields(in);
|
|
|
+ //Datnodes are not persistent any more.
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_DATANODE_REMOVE: {
|
|
|
+ numOpOther++;
|
|
|
+ DatanodeID nodeID = new DatanodeID();
|
|
|
+ nodeID.readFields(in);
|
|
|
+ //Datanodes are not persistent any more.
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_SET_PERMISSIONS: {
|
|
|
+ numOpSetPerm++;
|
|
|
+ if (logVersion > -11)
|
|
|
+ throw new IOException("Unexpected opcode " + opcode
|
|
|
+ + " for version " + logVersion);
|
|
|
+ fsDir.unprotectedSetPermission(
|
|
|
+ FSImage.readString(in), FsPermission.read(in));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_SET_OWNER: {
|
|
|
+ numOpSetOwner++;
|
|
|
+ if (logVersion > -11)
|
|
|
+ throw new IOException("Unexpected opcode " + opcode
|
|
|
+ + " for version " + logVersion);
|
|
|
+ fsDir.unprotectedSetOwner(FSImage.readString(in),
|
|
|
+ FSImage.readString(in), FSImage.readString(in));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_SET_QUOTA: {
|
|
|
+ if (logVersion > -16) {
|
|
|
+ throw new IOException("Unexpected opcode " + opcode
|
|
|
+ + " for version " + logVersion);
|
|
|
}
|
|
|
+ fsDir.unprotectedSetQuota(FSImage.readString(in),
|
|
|
+ readLongWritable(in) );
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case OP_CLEAR_QUOTA: {
|
|
|
+ if (logVersion > -16) {
|
|
|
+ throw new IOException("Unexpected opcode " + opcode
|
|
|
+ + " for version " + logVersion);
|
|
|
}
|
|
|
+ fsDir.unprotectedClearQuota(FSImage.readString(in));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ default: {
|
|
|
+ throw new IOException("Never seen opcode " + opcode);
|
|
|
+ }
|
|
|
}
|
|
|
- } finally {
|
|
|
- in.close();
|
|
|
}
|
|
|
- FSImage.LOG.info("Edits file " + edits.getName()
|
|
|
- + " of size " + edits.length() + " edits # " + numEdits
|
|
|
- + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
|
|
|
+ } finally {
|
|
|
+ in.close();
|
|
|
}
|
|
|
+ FSImage.LOG.info("Edits file " + edits.getName()
|
|
|
+ + " of size " + edits.length() + " edits # " + numEdits
|
|
|
+ + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
|
|
|
|
|
|
if (FSImage.LOG.isDebugEnabled()) {
|
|
|
FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose
|
|
@@ -718,11 +739,7 @@ public class FSEditLog {
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
try {
|
|
|
- DataOutputStream od = eStream.getOutputStream();
|
|
|
- od.write(op);
|
|
|
- for(Writable w : writables) {
|
|
|
- w.write(od);
|
|
|
- }
|
|
|
+ eStream.write(op, writables);
|
|
|
} catch (IOException ie) {
|
|
|
processIOError(idx);
|
|
|
}
|
|
@@ -747,7 +764,7 @@ public class FSEditLog {
|
|
|
//
|
|
|
// Sync all modifications done by this thread.
|
|
|
//
|
|
|
- public void logSync() {
|
|
|
+ public void logSync() throws IOException {
|
|
|
ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
long syncStart = 0;
|
|
|
|
|
@@ -781,7 +798,7 @@ public class FSEditLog {
|
|
|
// swap buffers
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
- eStream.swap();
|
|
|
+ eStream.setReadyToFlush();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -790,7 +807,7 @@ public class FSEditLog {
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
try {
|
|
|
- eStream.flushAndSyncOld();
|
|
|
+ eStream.flush();
|
|
|
} catch (IOException ie) {
|
|
|
//
|
|
|
// remember the streams that encountered an error.
|
|
@@ -972,14 +989,13 @@ public class FSEditLog {
|
|
|
assert(getNumStorageDirs() == editStreams.size());
|
|
|
long size = 0;
|
|
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
- assert(size == 0 ||
|
|
|
- size == getEditFile(idx).length() + eStream.getBufSize());
|
|
|
- size = getEditFile(idx).length() + eStream.getBufSize();
|
|
|
+ long curSize = editStreams.get(idx).length();
|
|
|
+ assert (size == 0 || size == curSize) : "All streams must be the same";
|
|
|
+ size = curSize;
|
|
|
}
|
|
|
return size;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Closes the current edit log and opens edits.new.
|
|
|
* Returns the lastModified time of the edits log.
|
|
@@ -1006,7 +1022,7 @@ public class FSEditLog {
|
|
|
//
|
|
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
try {
|
|
|
- EditLogOutputStream eStream = new EditLogOutputStream(getEditNewFile(idx));
|
|
|
+ EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(idx));
|
|
|
eStream.create();
|
|
|
editStreams.add(eStream);
|
|
|
} catch (IOException e) {
|
|
@@ -1063,7 +1079,7 @@ public class FSEditLog {
|
|
|
* Returns the timestamp of the edit log
|
|
|
*/
|
|
|
synchronized long getFsEditTime() {
|
|
|
- return getEditFile(0).lastModified();
|
|
|
+ return editStreams.get(0).lastModified();
|
|
|
}
|
|
|
|
|
|
// sets the initial capacity of the flush buffer.
|