|
@@ -115,6 +115,8 @@ public class FSEditLog {
|
|
|
// is a sync currently running?
|
|
|
private volatile boolean isSyncRunning;
|
|
|
|
|
|
+ // is an automatic sync scheduled?
|
|
|
+ private volatile boolean isAutoSyncScheduled = false;
|
|
|
|
|
|
// these are statistics counters.
|
|
|
private long numTransactions; // number of transactions
|
|
@@ -846,31 +848,84 @@ public class FSEditLog {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Write an operation to the edit log. Do not sync to persistent
|
|
|
- * store yet.
|
|
|
+ * Write an operation to the edit log.
|
|
|
+ * Automatically sync buffered edits to persistent store if it is time
|
|
|
+ * to sync.
|
|
|
*/
|
|
|
- synchronized void logEdit(byte op, Writable ... writables) {
|
|
|
- if(getNumEditStreams() == 0)
|
|
|
- throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- long start = FSNamesystem.now();
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
- FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
|
|
|
- if(!eStream.isOperationSupported(op))
|
|
|
- continue;
|
|
|
- try {
|
|
|
- eStream.write(op, writables);
|
|
|
- } catch (IOException ie) {
|
|
|
- FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
+ void logEdit(byte op, Writable ... writables) {
|
|
|
+ synchronized (this) {
|
|
|
+ // wait if an automatic sync is scheduled
|
|
|
+ waitIfAutoSyncScheduled();
|
|
|
+
|
|
|
+ if(getNumEditStreams() == 0)
|
|
|
+ throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
|
|
|
+ ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
+ long start = FSNamesystem.now();
|
|
|
+ for(EditLogOutputStream eStream : editStreams) {
|
|
|
+ FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
|
|
|
+ if(!eStream.isOperationSupported(op))
|
|
|
+ continue;
|
|
|
+ try {
|
|
|
+ eStream.write(op, writables);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
|
|
|
+ if(errorStreams == null)
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ }
|
|
|
}
|
|
|
+ processIOError(errorStreams, true);
|
|
|
+ recordTransaction(start);
|
|
|
+
|
|
|
+ // check if it is time to schedule an automatic sync
|
|
|
+ if (!shouldForceSync()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ isAutoSyncScheduled = true;
|
|
|
}
|
|
|
- processIOError(errorStreams, true);
|
|
|
- recordTransaction(start);
|
|
|
+
|
|
|
+ // sync buffered edit log entries to persistent store
|
|
|
+ logSync();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Wait if an automatic sync is scheduled
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ synchronized void waitIfAutoSyncScheduled() {
|
|
|
+ try {
|
|
|
+ while (isAutoSyncScheduled) {
|
|
|
+ this.wait(1000);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Signal that an automatic sync scheduling is done if it is scheduled
|
|
|
+ */
|
|
|
+ synchronized void doneWithAutoSyncScheduling() {
|
|
|
+ if (isAutoSyncScheduled) {
|
|
|
+ isAutoSyncScheduled = false;
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if should automatically sync buffered edits to
|
|
|
+ * persistent store
|
|
|
+ *
|
|
|
+ * @return true if any of the edit stream says that it should sync
|
|
|
+ */
|
|
|
+ private boolean shouldForceSync() {
|
|
|
+ for (EditLogOutputStream eStream : editStreams) {
|
|
|
+ if (eStream.shouldForceSync()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
private void recordTransaction(long start) {
|
|
|
// get a new transactionId
|
|
|
txid++;
|
|
@@ -935,16 +990,17 @@ public class FSEditLog {
|
|
|
* concurrency with sync() should be synchronized and also call
|
|
|
* waitForSyncToFinish() before assuming they are running alone.
|
|
|
*/
|
|
|
- public void logSync() throws IOException {
|
|
|
+ public void logSync() {
|
|
|
ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
long syncStart = 0;
|
|
|
|
|
|
// Fetch the transactionId of this thread.
|
|
|
long mytxid = myTransactionId.get().txid;
|
|
|
- EditLogOutputStream streams[] = null;
|
|
|
+ ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
|
|
|
boolean sync = false;
|
|
|
try {
|
|
|
synchronized (this) {
|
|
|
+ try {
|
|
|
assert editStreams.size() > 0 : "no editlog streams";
|
|
|
printStatistics(false);
|
|
|
|
|
@@ -973,16 +1029,29 @@ public class FSEditLog {
|
|
|
|
|
|
// swap buffers
|
|
|
for(EditLogOutputStream eStream : editStreams) {
|
|
|
- eStream.setReadyToFlush();
|
|
|
+ try {
|
|
|
+ eStream.setReadyToFlush();
|
|
|
+ streams.add(eStream);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
|
|
|
+ //
|
|
|
+ // remember the streams that encountered an error.
|
|
|
+ //
|
|
|
+ if (errorStreams == null) {
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ }
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // Prevent RuntimeException from blocking other log edit write
|
|
|
+ doneWithAutoSyncScheduling();
|
|
|
}
|
|
|
- streams =
|
|
|
- editStreams.toArray(new EditLogOutputStream[editStreams.size()]);
|
|
|
}
|
|
|
|
|
|
// do the sync
|
|
|
long start = FSNamesystem.now();
|
|
|
- for (int idx = 0; idx < streams.length; idx++) {
|
|
|
- EditLogOutputStream eStream = streams[idx];
|
|
|
+ for (EditLogOutputStream eStream : streams) {
|
|
|
try {
|
|
|
eStream.flush();
|
|
|
} catch (IOException ie) {
|
|
@@ -1002,6 +1071,7 @@ public class FSEditLog {
|
|
|
if (metrics != null) // Metrics non-null only when used inside name node
|
|
|
metrics.syncs.inc(elapsed);
|
|
|
} finally {
|
|
|
+ // Prevent RuntimeException from blocking other log edit sync
|
|
|
synchronized (this) {
|
|
|
synctxid = syncStart;
|
|
|
if (sync) {
|