|
@@ -188,7 +188,7 @@ public class FSEditLog {
|
|
|
this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
|
|
|
}
|
|
|
|
|
|
- public void initJournalsForWrite() {
|
|
|
+ public synchronized void initJournalsForWrite() {
|
|
|
Preconditions.checkState(state == State.UNINITIALIZED ||
|
|
|
state == State.CLOSED, "Unexpected state: %s", state);
|
|
|
|
|
@@ -196,7 +196,7 @@ public class FSEditLog {
|
|
|
state = State.BETWEEN_LOG_SEGMENTS;
|
|
|
}
|
|
|
|
|
|
- public void initSharedJournalsForRead() {
|
|
|
+ public synchronized void initSharedJournalsForRead() {
|
|
|
if (state == State.OPEN_FOR_READING) {
|
|
|
LOG.warn("Initializing shared journals for READ, already open for READ",
|
|
|
new Exception());
|
|
@@ -209,7 +209,7 @@ public class FSEditLog {
|
|
|
state = State.OPEN_FOR_READING;
|
|
|
}
|
|
|
|
|
|
- private void initJournals(List<URI> dirs) {
|
|
|
+ private synchronized void initJournals(List<URI> dirs) {
|
|
|
int minimumRedundantJournals = conf.getInt(
|
|
|
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
|
|
@@ -808,7 +808,7 @@ public class FSEditLog {
|
|
|
* Used only by unit tests.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- List<JournalAndStream> getJournals() {
|
|
|
+ synchronized List<JournalAndStream> getJournals() {
|
|
|
return journalSet.getAllJournalStreams();
|
|
|
}
|
|
|
|
|
@@ -816,7 +816,7 @@ public class FSEditLog {
|
|
|
* Used only by tests.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- public JournalSet getJournalSet() {
|
|
|
+ synchronized public JournalSet getJournalSet() {
|
|
|
return journalSet;
|
|
|
}
|
|
|
|
|
@@ -950,17 +950,14 @@ public class FSEditLog {
|
|
|
/**
|
|
|
* Archive any log files that are older than the given txid.
|
|
|
*/
|
|
|
- public void purgeLogsOlderThan(final long minTxIdToKeep) {
|
|
|
- synchronized (this) {
|
|
|
- // synchronized to prevent findbugs warning about inconsistent
|
|
|
- // synchronization. This will be JIT-ed out if asserts are
|
|
|
- // off.
|
|
|
- assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
|
|
|
- minTxIdToKeep <= curSegmentTxId :
|
|
|
- "cannot purge logs older than txid " + minTxIdToKeep +
|
|
|
- " when current segment starts at " + curSegmentTxId;
|
|
|
- }
|
|
|
-
|
|
|
+ public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
|
|
|
+ assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
|
|
|
+ minTxIdToKeep <= curSegmentTxId :
|
|
|
+ "cannot purge logs older than txid " + minTxIdToKeep +
|
|
|
+ " when current segment starts at " + curSegmentTxId;
|
|
|
+
|
|
|
+ // This could be improved to not need synchronization. But currently,
|
|
|
+ // journalSet is not threadsafe, so we need to synchronize this method.
|
|
|
try {
|
|
|
journalSet.purgeLogsOlderThan(minTxIdToKeep);
|
|
|
} catch (IOException ex) {
|
|
@@ -992,8 +989,8 @@ public class FSEditLog {
|
|
|
|
|
|
|
|
|
// sets the initial capacity of the flush buffer.
|
|
|
- public void setOutputBufferCapacity(int size) {
|
|
|
- journalSet.setOutputBufferCapacity(size);
|
|
|
+ synchronized void setOutputBufferCapacity(int size) {
|
|
|
+ journalSet.setOutputBufferCapacity(size);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1069,7 +1066,7 @@ public class FSEditLog {
|
|
|
/**
|
|
|
* Run recovery on all journals to recover any unclosed segments
|
|
|
*/
|
|
|
- void recoverUnclosedStreams() {
|
|
|
+ synchronized void recoverUnclosedStreams() {
|
|
|
Preconditions.checkState(
|
|
|
state == State.BETWEEN_LOG_SEGMENTS,
|
|
|
"May not recover segments - wrong state: %s", state);
|
|
@@ -1092,7 +1089,7 @@ public class FSEditLog {
|
|
|
* @param toAtLeast the selected streams must contain this transaction
|
|
|
* @param inProgessOk set to true if in-progress streams are OK
|
|
|
*/
|
|
|
- public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
|
|
|
+ public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId,
|
|
|
long toAtLeastTxId, boolean inProgressOk) throws IOException {
|
|
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
|
|
EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
|