|
@@ -957,69 +957,86 @@ public class FSEditLog {
|
|
|
// Fetch the transactionId of this thread.
|
|
|
long mytxid = myTransactionId.get().txid;
|
|
|
|
|
|
- final int numEditStreams;
|
|
|
- synchronized (this) {
|
|
|
- numEditStreams = editStreams.size();
|
|
|
- assert numEditStreams > 0 : "no editlog streams";
|
|
|
- printStatistics(false);
|
|
|
-
|
|
|
- // if somebody is already syncing, then wait
|
|
|
- while (mytxid > synctxid && isSyncRunning) {
|
|
|
- try {
|
|
|
- wait(1000);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
+ ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
|
|
|
+ boolean sync = false;
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ printStatistics(false);
|
|
|
+
|
|
|
+ // if somebody is already syncing, then wait
|
|
|
+ while (mytxid > synctxid && isSyncRunning) {
|
|
|
+ try {
|
|
|
+ wait(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // If this transaction was already flushed, then nothing to do
|
|
|
+ //
|
|
|
+ if (mytxid <= synctxid) {
|
|
|
+ numTransactionsBatchedInSync++;
|
|
|
+ if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
+ metrics.incrTransactionsBatchedInSync();
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- //
|
|
|
- // If this transaction was already flushed, then nothing to do
|
|
|
- //
|
|
|
- if (mytxid <= synctxid) {
|
|
|
- numTransactionsBatchedInSync++;
|
|
|
- if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
- metrics.incrTransactionsBatchedInSync();
|
|
|
- return;
|
|
|
+ // now, this thread will do the sync
|
|
|
+ syncStart = txid;
|
|
|
+ isSyncRunning = true;
|
|
|
+ sync = true;
|
|
|
+
|
|
|
+ // swap buffers
|
|
|
+ assert editStreams.size() > 0 : "no editlog streams";
|
|
|
+ for(EditLogOutputStream eStream : editStreams) {
|
|
|
+ try {
|
|
|
+ eStream.setReadyToFlush();
|
|
|
+ streams.add(eStream);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
|
|
|
+ //
|
|
|
+ // remember the streams that encountered an error.
|
|
|
+ //
|
|
|
+ if (errorStreams == null) {
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ }
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- // now, this thread will do the sync
|
|
|
- syncStart = txid;
|
|
|
- isSyncRunning = true;
|
|
|
-
|
|
|
- // swap buffers
|
|
|
- for (int idx = 0; idx < numEditStreams; idx++) {
|
|
|
- editStreams.get(idx).setReadyToFlush();
|
|
|
+
|
|
|
+ // do the sync
|
|
|
+ long start = FSNamesystem.now();
|
|
|
+ for (EditLogOutputStream eStream : streams) {
|
|
|
+ try {
|
|
|
+ eStream.flush();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ FSNamesystem.LOG.error("Unable to sync edit log.", ie);
|
|
|
+ //
|
|
|
+ // remember the streams that encountered an error.
|
|
|
+ //
|
|
|
+ if (errorStreams == null) {
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ }
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
+ long elapsed = FSNamesystem.now() - start;
|
|
|
+ removeEditsStreamsAndStorageDirs(errorStreams);
|
|
|
+ exitIfNoStreams();
|
|
|
|
|
|
- // do the sync
|
|
|
- long start = FSNamesystem.now();
|
|
|
- for (int idx = 0; idx < numEditStreams; idx++) {
|
|
|
- EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
- try {
|
|
|
- eStream.flush();
|
|
|
- } catch (IOException ioe) {
|
|
|
- //
|
|
|
- // remember the streams that encountered an error.
|
|
|
- //
|
|
|
- if (errorStreams == null) {
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
+ metrics.addSync(elapsed);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ synchronized (this) {
|
|
|
+ if(sync) {
|
|
|
+ synctxid = syncStart;
|
|
|
+ isSyncRunning = false;
|
|
|
}
|
|
|
- errorStreams.add(eStream);
|
|
|
- FSNamesystem.LOG.error("Unable to sync "+eStream.getName());
|
|
|
+ this.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
- long elapsed = FSNamesystem.now() - start;
|
|
|
-
|
|
|
- synchronized (this) {
|
|
|
- removeEditsStreamsAndStorageDirs(errorStreams);
|
|
|
- exitIfNoStreams();
|
|
|
- synctxid = syncStart;
|
|
|
- isSyncRunning = false;
|
|
|
- this.notifyAll();
|
|
|
- }
|
|
|
-
|
|
|
- if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
- metrics.addSync(elapsed);
|
|
|
}
|
|
|
|
|
|
//
|