|
@@ -320,7 +320,7 @@ public class FSEditLog {
|
|
|
* @param es - stream to remove
|
|
|
* @return the matching stream
|
|
|
*/
|
|
|
- public StorageDirectory getStorage(EditLogOutputStream es) {
|
|
|
+ StorageDirectory getStorage(EditLogOutputStream es) {
|
|
|
String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
.getParentFile().getParentFile().getAbsolutePath();
|
|
|
|
|
@@ -339,7 +339,7 @@ public class FSEditLog {
|
|
|
* @param sd
|
|
|
* @return the matching stream
|
|
|
*/
|
|
|
- public EditLogOutputStream getEditsStream(StorageDirectory sd) {
|
|
|
+ synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
|
|
|
for (EditLogOutputStream es : editStreams) {
|
|
|
File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
.getParentFile().getParentFile();
|
|
@@ -780,68 +780,76 @@ public class FSEditLog {
|
|
|
|
|
|
// Fetch the transactionId of this thread.
|
|
|
long mytxid = myTransactionId.get().txid;
|
|
|
-
|
|
|
- synchronized (this) {
|
|
|
- assert editStreams.size() > 0 : "no editlog streams";
|
|
|
- printStatistics(false);
|
|
|
-
|
|
|
- // if somebody is already syncing, then wait
|
|
|
- while (mytxid > synctxid && isSyncRunning) {
|
|
|
- try {
|
|
|
- wait(1000);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
+ EditLogOutputStream streams[] = null;
|
|
|
+ boolean sync = false;
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ assert editStreams.size() > 0 : "no editlog streams";
|
|
|
+ printStatistics(false);
|
|
|
+
|
|
|
+ // if somebody is already syncing, then wait
|
|
|
+ while (mytxid > synctxid && isSyncRunning) {
|
|
|
+ try {
|
|
|
+ wait(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // If this transaction was already flushed, then nothing to do
|
|
|
- //
|
|
|
- if (mytxid <= synctxid) {
|
|
|
- numTransactionsBatchedInSync++;
|
|
|
- if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
- metrics.transactionsBatchedInSync.inc();
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // now, this thread will do the sync
|
|
|
- syncStart = txid;
|
|
|
- isSyncRunning = true;
|
|
|
-
|
|
|
- // swap buffers
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
- eStream.setReadyToFlush();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // do the sync
|
|
|
- long start = FSNamesystem.now();
|
|
|
- for (int idx = 0; idx < editStreams.size(); idx++) {
|
|
|
- EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
- try {
|
|
|
- eStream.flush();
|
|
|
- } catch (IOException ie) {
|
|
|
+
|
|
|
//
|
|
|
- // remember the streams that encountered an error.
|
|
|
+ // If this transaction was already flushed, then nothing to do
|
|
|
//
|
|
|
- if (errorStreams == null) {
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ if (mytxid <= synctxid) {
|
|
|
+ numTransactionsBatchedInSync++;
|
|
|
+ if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
+ metrics.transactionsBatchedInSync.inc();
|
|
|
+ return;
|
|
|
}
|
|
|
- errorStreams.add(eStream);
|
|
|
- FSNamesystem.LOG.error("Unable to sync edit log. " +
|
|
|
- "Fatal Error.");
|
|
|
+
|
|
|
+ // now, this thread will do the sync
|
|
|
+ syncStart = txid;
|
|
|
+ isSyncRunning = true;
|
|
|
+ sync = true;
|
|
|
+
|
|
|
+ // swap buffers
|
|
|
+ for(EditLogOutputStream eStream : editStreams) {
|
|
|
+ eStream.setReadyToFlush();
|
|
|
+ }
|
|
|
+ streams =
|
|
|
+ editStreams.toArray(new EditLogOutputStream[editStreams.size()]);
|
|
|
}
|
|
|
+
|
|
|
+ // do the sync
|
|
|
+ long start = FSNamesystem.now();
|
|
|
+ for (int idx = 0; idx < streams.length; idx++) {
|
|
|
+ EditLogOutputStream eStream = streams[idx];
|
|
|
+ try {
|
|
|
+ eStream.flush();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ //
|
|
|
+ // remember the streams that encountered an error.
|
|
|
+ //
|
|
|
+ if (errorStreams == null) {
|
|
|
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
+ }
|
|
|
+ errorStreams.add(eStream);
|
|
|
+ FSNamesystem.LOG.error("Unable to sync edit log. " +
|
|
|
+ "Fatal Error.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ long elapsed = FSNamesystem.now() - start;
|
|
|
+ processIOError(errorStreams, true);
|
|
|
+
|
|
|
+ if (metrics != null) // Metrics non-null only when used inside name node
|
|
|
+ metrics.syncs.inc(elapsed);
|
|
|
+ } finally {
|
|
|
+ synchronized (this) {
|
|
|
+ synctxid = syncStart;
|
|
|
+ if (sync) {
|
|
|
+ isSyncRunning = false;
|
|
|
+ }
|
|
|
+ this.notifyAll();
|
|
|
+ }
|
|
|
}
|
|
|
- long elapsed = FSNamesystem.now() - start;
|
|
|
-
|
|
|
- synchronized (this) {
|
|
|
- processIOError(errorStreams, true);
|
|
|
- synctxid = syncStart;
|
|
|
- isSyncRunning = false;
|
|
|
- this.notifyAll();
|
|
|
- }
|
|
|
-
|
|
|
- if (metrics != null) // Metrics is non-null only when used inside name node
|
|
|
- metrics.syncs.inc(elapsed);
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -1030,14 +1038,6 @@ public class FSEditLog {
|
|
|
return size;
|
|
|
}
|
|
|
|
|
|
- public String listEditsStreams() {
|
|
|
- StringBuffer buf = new StringBuffer();
|
|
|
- for (EditLogOutputStream os : editStreams) {
|
|
|
- buf.append(os.getName() + ";");
|
|
|
- }
|
|
|
- return buf.toString();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Closes the current edit log and opens edits.new.
|
|
|
*/
|
|
@@ -1272,7 +1272,7 @@ public class FSEditLog {
|
|
|
* @param nnReg this (active) name-node registration.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void logJSpoolStart(NamenodeRegistration bnReg, // backup node
|
|
|
+ synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
|
|
|
NamenodeRegistration nnReg) // active name-node
|
|
|
throws IOException {
|
|
|
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
|
|
@@ -1331,22 +1331,27 @@ public class FSEditLog {
|
|
|
}
|
|
|
|
|
|
public boolean hasNext() {
|
|
|
- if(editStreams == null ||
|
|
|
- editStreams.isEmpty() || nextIndex >= editStreams.size())
|
|
|
- return false;
|
|
|
- while(nextIndex < editStreams.size()
|
|
|
- && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
- nextIndex++;
|
|
|
- return nextIndex < editStreams.size();
|
|
|
+ synchronized(FSEditLog.this) {
|
|
|
+ if(editStreams == null ||
|
|
|
+ editStreams.isEmpty() || nextIndex >= editStreams.size())
|
|
|
+ return false;
|
|
|
+ while(nextIndex < editStreams.size()
|
|
|
+ && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
+ nextIndex++;
|
|
|
+ return nextIndex < editStreams.size();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public EditLogOutputStream next() {
|
|
|
- EditLogOutputStream stream = editStreams.get(nextIndex);
|
|
|
- prevIndex = nextIndex;
|
|
|
- nextIndex++;
|
|
|
- while(nextIndex < editStreams.size()
|
|
|
- && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
- nextIndex++;
|
|
|
+ EditLogOutputStream stream = null;
|
|
|
+ synchronized(FSEditLog.this) {
|
|
|
+ stream = editStreams.get(nextIndex);
|
|
|
+ prevIndex = nextIndex;
|
|
|
+ nextIndex++;
|
|
|
+ while(nextIndex < editStreams.size()
|
|
|
+ && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
+ nextIndex++;
|
|
|
+ }
|
|
|
return stream;
|
|
|
}
|
|
|
|
|
@@ -1357,9 +1362,11 @@ public class FSEditLog {
|
|
|
}
|
|
|
|
|
|
void replace(EditLogOutputStream newStream) {
|
|
|
- assert 0 <= prevIndex && prevIndex < editStreams.size() :
|
|
|
- "Index out of bound.";
|
|
|
- editStreams.set(prevIndex, newStream);
|
|
|
+ synchronized (FSEditLog.this) {
|
|
|
+ assert 0 <= prevIndex && prevIndex < editStreams.size() :
|
|
|
+ "Index out of bound.";
|
|
|
+ editStreams.set(prevIndex, newStream);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|