|
@@ -20,6 +20,8 @@ package org.apache.hadoop.dfs;
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.DataOutput;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
@@ -27,6 +29,7 @@ import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.lang.Math;
|
|
|
+import java.nio.channels.FileChannel;
|
|
|
|
|
|
import org.apache.hadoop.io.ArrayWritable;
|
|
|
import org.apache.hadoop.io.UTF8;
|
|
@@ -45,33 +48,167 @@ class FSEditLog {
|
|
|
//the following two are used only for backword compatibility :
|
|
|
@Deprecated private static final byte OP_DATANODE_ADD = 5;
|
|
|
@Deprecated private static final byte OP_DATANODE_REMOVE = 6;
|
|
|
+ private static int sizeFlushBuffer = 512*1024;
|
|
|
|
|
|
private ArrayList<EditLogOutputStream> editStreams = null;
|
|
|
private FSImage fsimage = null;
|
|
|
|
|
|
- private long lastModificationTime;
|
|
|
- private long lastSyncTime;
|
|
|
-
|
|
|
- static class EditLogOutputStream extends DataOutputStream {
|
|
|
+ // a monotonically increasing counter that represents transactionIds.
|
|
|
+ private long txid = 0;
|
|
|
+
|
|
|
+ // stores the last synced transactionId.
|
|
|
+ private long synctxid = 0;
|
|
|
+
|
|
|
+ // the time of printing the statistics to the log file.
|
|
|
+ private long lastPrintTime;
|
|
|
+
|
|
|
+ // is a sync currently running?
|
|
|
+ private boolean isSyncRunning;
|
|
|
+
|
|
|
+ // these are statistics counters.
|
|
|
+ private long numTransactions; // number of transactions
|
|
|
+ private long totalTimeTransactions; // total time for all transactions
|
|
|
+ private NameNodeMetrics metrics;
|
|
|
+
|
|
|
+ private static class TransactionId {
|
|
|
+ public long txid;
|
|
|
+
|
|
|
+ TransactionId(long value) {
|
|
|
+ this.txid = value;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // stores the most current transactionId of this thread.
|
|
|
+ private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
|
|
|
+ protected synchronized TransactionId initialValue() {
|
|
|
+ return new TransactionId(Long.MAX_VALUE);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ static class EditLogOutputStream {
|
|
|
+ private FileChannel fc;
|
|
|
+ private FileOutputStream fp;
|
|
|
+ private DataOutputStream od;
|
|
|
+ private DataOutputStream od1;
|
|
|
+ private DataOutputStream od2;
|
|
|
+ private ByteArrayOutputStream buf1;
|
|
|
+ private ByteArrayOutputStream buf2;
|
|
|
+ private int bufSize;
|
|
|
+
|
|
|
+ // these are statistics counters
|
|
|
+ private long numSync; // number of syncs to disk
|
|
|
+ private long totalTimeSync; // total time to sync
|
|
|
+
|
|
|
EditLogOutputStream(File name) throws IOException {
|
|
|
- super(new FileOutputStream(name, true)); // open for append
|
|
|
+ bufSize = sizeFlushBuffer;
|
|
|
+ buf1 = new ByteArrayOutputStream(bufSize);
|
|
|
+ buf2 = new ByteArrayOutputStream(bufSize);
|
|
|
+ od1 = new DataOutputStream(buf1);
|
|
|
+ od2 = new DataOutputStream(buf2);
|
|
|
+ od = od1; // start with first buffer
|
|
|
+ fp = new FileOutputStream(name, true); // open for append
|
|
|
+ fc = fp.getChannel();
|
|
|
+ numSync = totalTimeSync = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // returns the current output stream
|
|
|
+ DataOutputStream getOutputStream() {
|
|
|
+ return od;
|
|
|
}
|
|
|
|
|
|
void flushAndSync() throws IOException {
|
|
|
- ((FileOutputStream)out).getChannel().force(true);
|
|
|
+ this.flush();
|
|
|
+ fc.force(true);
|
|
|
}
|
|
|
|
|
|
void create() throws IOException {
|
|
|
- ((FileOutputStream)out).getChannel().truncate(0);
|
|
|
- writeInt(FSConstants.LAYOUT_VERSION);
|
|
|
+ fc.truncate(0);
|
|
|
+ od.writeInt(FSConstants.LAYOUT_VERSION);
|
|
|
flushAndSync();
|
|
|
}
|
|
|
+
|
|
|
+ // flush current buffer
|
|
|
+ private void flush() throws IOException {
|
|
|
+ ByteArrayOutputStream buf = getBuffer();
|
|
|
+ if (buf.size() == 0) {
|
|
|
+ return; // no data to flush
|
|
|
+ }
|
|
|
+ buf.writeTo(fp); // write data to file
|
|
|
+ buf.reset(); // erase all data in buf
|
|
|
+ }
|
|
|
+
|
|
|
+ void close() throws IOException {
|
|
|
+ // close should have been called after all pending transactions
|
|
|
+ // have been flushed & synced.
|
|
|
+ if (getBufSize() != 0) {
|
|
|
+ throw new IOException("FSEditStream has " + getBufSize() +
|
|
|
+ " bytes still to be flushed and cannot " +
|
|
|
+ "closed.");
|
|
|
+ }
|
|
|
+ od.close();
|
|
|
+ fp.close();
|
|
|
+ buf1 = buf2 = null;
|
|
|
+ od = od1 = od2 = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // returns the amount of data in the buffer
|
|
|
+ int getBufSize() {
|
|
|
+ return getBuffer().size();
|
|
|
+ }
|
|
|
+
|
|
|
+ // get the current buffer
|
|
|
+ private ByteArrayOutputStream getBuffer() {
|
|
|
+ if (od == od1) {
|
|
|
+ return buf1;
|
|
|
+ } else {
|
|
|
+ return buf2;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Flush current buffer to output stream, swap buffers
|
|
|
+ // This is protected by the flushLock.
|
|
|
+ //
|
|
|
+ void swap() {
|
|
|
+ if (od == od1) {
|
|
|
+ od = od2;
|
|
|
+ } else {
|
|
|
+ od = od1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Flush old buffer to persistent store
|
|
|
+ //
|
|
|
+ void flushAndSyncOld() throws IOException {
|
|
|
+ numSync++;
|
|
|
+ ByteArrayOutputStream oldbuf;
|
|
|
+ if (od == od1) {
|
|
|
+ oldbuf = buf2;
|
|
|
+ } else {
|
|
|
+ oldbuf = buf1;
|
|
|
+ }
|
|
|
+ long start = FSNamesystem.now();
|
|
|
+ oldbuf.writeTo(fp); // write data to file
|
|
|
+ oldbuf.reset(); // erase all data in buf
|
|
|
+ fc.force(true); // sync to persistent store
|
|
|
+ long end = FSNamesystem.now();
|
|
|
+ totalTimeSync += (end - start);
|
|
|
+ }
|
|
|
+
|
|
|
+ long getTotalSyncTime() {
|
|
|
+ return totalTimeSync;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getNumSync() {
|
|
|
+ return numSync;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
FSEditLog(FSImage image) {
|
|
|
fsimage = image;
|
|
|
- lastModificationTime = 0;
|
|
|
- lastSyncTime = 0;
|
|
|
+ isSyncRunning = false;
|
|
|
+ metrics = NameNode.getNameNodeMetrics();
|
|
|
}
|
|
|
|
|
|
private File getEditFile(int idx) {
|
|
@@ -101,6 +238,7 @@ class FSEditLog {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
synchronized void open() throws IOException {
|
|
|
+ numTransactions = totalTimeTransactions = 0;
|
|
|
int size = getNumStorageDirs();
|
|
|
if (editStreams == null)
|
|
|
editStreams = new ArrayList<EditLogOutputStream>(size);
|
|
@@ -138,9 +276,18 @@ class FSEditLog {
|
|
|
* Shutdown the filestore
|
|
|
*/
|
|
|
synchronized void close() throws IOException {
|
|
|
+ while (isSyncRunning) {
|
|
|
+ try {
|
|
|
+ wait(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
if (editStreams == null) {
|
|
|
return;
|
|
|
}
|
|
|
+ printStatistics(true);
|
|
|
+ numTransactions = totalTimeTransactions = 0;
|
|
|
+
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
try {
|
|
@@ -174,6 +321,38 @@ class FSEditLog {
|
|
|
fsimage.processIOError(index);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The specified streams have IO errors. Remove them from logging
|
|
|
+ * new transactions.
|
|
|
+ */
|
|
|
+ private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
|
|
|
+ if (errorStreams == null) {
|
|
|
+ return; // nothing to do
|
|
|
+ }
|
|
|
+ for (int idx = 0; idx < errorStreams.size(); idx++) {
|
|
|
+ EditLogOutputStream eStream = errorStreams.get(idx);
|
|
|
+ int j = 0;
|
|
|
+ for (j = 0; j < editStreams.size(); j++) {
|
|
|
+ if (editStreams.get(j) == eStream) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (j == editStreams.size()) {
|
|
|
+ FSNamesystem.LOG.error("Unable to find sync log on which " +
|
|
|
+ " IO error occured. " +
|
|
|
+ "Fatal Error.");
|
|
|
+ Runtime.getRuntime().exit(-1);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ processIOError(idx);
|
|
|
+ } catch (IOException e) {
|
|
|
+ FSNamesystem.LOG.error("Unable to sync edit log. " +
|
|
|
+ "Fatal Error.");
|
|
|
+ Runtime.getRuntime().exit(-1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* check if ANY edits.new log exists
|
|
|
*/
|
|
@@ -425,65 +604,140 @@ class FSEditLog {
|
|
|
*/
|
|
|
synchronized void logEdit(byte op, Writable w1, Writable w2) {
|
|
|
assert this.getNumEditStreams() > 0 : "no editlog streams";
|
|
|
+ long start = FSNamesystem.now();
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
- EditLogOutputStream eStream;
|
|
|
- synchronized (eStream = editStreams.get(idx)) {
|
|
|
+ EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ try {
|
|
|
+ DataOutputStream od = eStream.getOutputStream();
|
|
|
+ od.write(op);
|
|
|
+ if (w1 != null) {
|
|
|
+ w1.write(od);
|
|
|
+ }
|
|
|
+ if (w2 != null) {
|
|
|
+ w2.write(od);
|
|
|
+ }
|
|
|
+ } catch (IOException ie) {
|
|
|
try {
|
|
|
- eStream.write(op);
|
|
|
- if (w1 != null) {
|
|
|
- w1.write(eStream);
|
|
|
- }
|
|
|
- if (w2 != null) {
|
|
|
- w2.write(eStream);
|
|
|
- }
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- processIOError(idx);
|
|
|
- } catch (IOException e) {
|
|
|
- FSNamesystem.LOG.error("Unable to append to edit log. " +
|
|
|
- "Fatal Error.");
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
- }
|
|
|
+ processIOError(idx);
|
|
|
+ } catch (IOException e) {
|
|
|
+ FSNamesystem.LOG.error("Unable to append to edit log. " +
|
|
|
+ "Fatal Error.");
|
|
|
+ Runtime.getRuntime().exit(-1);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ // get a new transactionId
|
|
|
+ txid++;
|
|
|
+
|
|
|
//
|
|
|
- // record the time when new data was written to the edits log
|
|
|
+ // record the transactionId when new data was written to the edits log
|
|
|
//
|
|
|
- lastModificationTime = System.currentTimeMillis();
|
|
|
+ TransactionId id = (TransactionId)myTransactionId.get();
|
|
|
+ id.txid = txid;
|
|
|
+
|
|
|
+ // update statistics
|
|
|
+ long end = FSNamesystem.now();
|
|
|
+ numTransactions++;
|
|
|
+ totalTimeTransactions += (end-start);
|
|
|
+ metrics.incrNumTransactions(1, (int)(end-start));
|
|
|
}
|
|
|
|
|
|
//
|
|
|
- // flush all data of the Edits log into persistent store
|
|
|
+ // Sync all modifications done by this thread.
|
|
|
//
|
|
|
- synchronized void logSync() {
|
|
|
- assert this.getNumEditStreams() > 0 : "no editlog streams";
|
|
|
+ void logSync() {
|
|
|
+ ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
+ long syncStart = 0;
|
|
|
|
|
|
- //
|
|
|
- // If data was generated before the beginning of the last sync time
|
|
|
- // then there is nothing to flush
|
|
|
- //
|
|
|
- if (lastModificationTime < lastSyncTime) {
|
|
|
- return;
|
|
|
+ // Fetch the transactionId of this thread.
|
|
|
+ TransactionId id = (TransactionId)myTransactionId.get();
|
|
|
+ long mytxid = id.txid;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ assert this.getNumEditStreams() > 0 : "no editlog streams";
|
|
|
+ printStatistics(false);
|
|
|
+
|
|
|
+ // if somebody is already syncing, then wait
|
|
|
+ while (mytxid > synctxid && isSyncRunning) {
|
|
|
+ try {
|
|
|
+ wait(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // If this transaction was already flushed, then nothing to do
|
|
|
+ //
|
|
|
+ if (mytxid <= synctxid) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // now, this thread will do the sync
|
|
|
+ syncStart = txid;
|
|
|
+ isSyncRunning = true;
|
|
|
+
|
|
|
+ // swap buffers
|
|
|
+ for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
+ EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ eStream.swap();
|
|
|
+ }
|
|
|
}
|
|
|
- lastSyncTime = System.currentTimeMillis();
|
|
|
|
|
|
+ // do the sync
|
|
|
+ long start = FSNamesystem.now();
|
|
|
for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
- EditLogOutputStream eStream;
|
|
|
- synchronized (eStream = editStreams.get(idx)) {
|
|
|
- try {
|
|
|
- eStream.flushAndSync();
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- processIOError(idx);
|
|
|
- } catch (IOException e) {
|
|
|
- FSNamesystem.LOG.error("Unable to sync edit log. " +
|
|
|
- "Fatal Error.");
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
- }
|
|
|
+ EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ try {
|
|
|
+ eStream.flushAndSyncOld();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ //
|
|
|
+ // remember the streams that encountered an error.
|
|
|
+ //
|
|
|
+ if (errorStreams == null) {
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
}
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ FSNamesystem.LOG.error("Unable to sync edit log. " +
|
|
|
+ "Fatal Error.");
|
|
|
}
|
|
|
}
|
|
|
+ long elapsed = FSNamesystem.now() - start;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ processIOError(errorStreams);
|
|
|
+ synctxid = syncStart;
|
|
|
+ isSyncRunning = false;
|
|
|
+ this.notifyAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ metrics.incrSyncs(1, (int)elapsed);
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // print statistics every 1 minute.
|
|
|
+ //
|
|
|
+ private void printStatistics(boolean force) {
|
|
|
+ long now = FSNamesystem.now();
|
|
|
+ if (lastPrintTime + 60000 < now && !force) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (editStreams == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ lastPrintTime = now;
|
|
|
+ StringBuffer buf = new StringBuffer();
|
|
|
+
|
|
|
+ buf.append("Number of transactions: " + numTransactions +
|
|
|
+ " Total time for transactions(ms): " +
|
|
|
+ totalTimeTransactions);
|
|
|
+ buf.append(" Number of syncs: " + editStreams.get(0).getNumSync());
|
|
|
+ buf.append(" SyncTimes(ms): ");
|
|
|
+ for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
+ EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ buf.append(eStream.getTotalSyncTime());
|
|
|
+ buf.append(" ");
|
|
|
+ }
|
|
|
+ FSNamesystem.LOG.info(buf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -561,10 +815,10 @@ class FSEditLog {
|
|
|
assert(getNumStorageDirs() == editStreams.size());
|
|
|
long size = 0;
|
|
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- synchronized (editStreams.get(idx)) {
|
|
|
- assert(size == 0 || size == getEditFile(idx).length());
|
|
|
- size = getEditFile(idx).length();
|
|
|
- }
|
|
|
+ EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ assert(size == 0 ||
|
|
|
+ size == getEditFile(idx).length() + eStream.getBufSize());
|
|
|
+ size = getEditFile(idx).length() + eStream.getBufSize();
|
|
|
}
|
|
|
return size;
|
|
|
}
|
|
@@ -654,4 +908,9 @@ class FSEditLog {
|
|
|
synchronized long getFsEditTime() throws IOException {
|
|
|
return getEditFile(0).lastModified();
|
|
|
}
|
|
|
+
|
|
|
+ // sets the initial capacity of the flush buffer.
|
|
|
+ static void setBufferCapacity(int size) {
|
|
|
+ sizeFlushBuffer = size;
|
|
|
+ }
|
|
|
}
|