|
@@ -17,10 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
-import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.DataOutputStream;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.zip.Checksum;
|
|
@@ -33,22 +30,26 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
-import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
import org.apache.hadoop.util.PureJavaCrc32;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
|
|
|
|
|
@@ -58,23 +59,42 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
-public class FSEditLog implements NNStorageListener {
|
|
|
+public class FSEditLog {
|
|
|
|
|
|
static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
|
|
|
" File system changes are not persistent. No journal streams.";
|
|
|
|
|
|
- private static final Log LOG = LogFactory.getLog(FSEditLog.class);
|
|
|
+ static final Log LOG = LogFactory.getLog(FSEditLog.class);
|
|
|
|
|
|
- private volatile int sizeOutputFlushBuffer = 512*1024;
|
|
|
+ /**
|
|
|
+ * State machine for edit log.
|
|
|
+ * The log starts in UNITIALIZED state upon construction. Once it's
|
|
|
+ * initialized, it is usually in IN_SEGMENT state, indicating that edits
|
|
|
+ * may be written. In the middle of a roll, or while saving the namespace,
|
|
|
+ * it briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the
|
|
|
+ * previous segment has been closed, but the new one has not yet been opened.
|
|
|
+ */
|
|
|
+ private enum State {
|
|
|
+ UNINITIALIZED,
|
|
|
+ BETWEEN_LOG_SEGMENTS,
|
|
|
+ IN_SEGMENT,
|
|
|
+ CLOSED;
|
|
|
+ }
|
|
|
+ private State state = State.UNINITIALIZED;
|
|
|
|
|
|
- private ArrayList<EditLogOutputStream> editStreams = null;
|
|
|
|
|
|
+ private List<JournalAndStream> journals = Lists.newArrayList();
|
|
|
+
|
|
|
// a monotonically increasing counter that represents transactionIds.
|
|
|
private long txid = 0;
|
|
|
|
|
|
// stores the last synced transactionId.
|
|
|
private long synctxid = 0;
|
|
|
|
|
|
+ // the first txid of the log that's currently open for writing.
|
|
|
+ // If this value is N, we are currently writing to edits_inprogress_N
|
|
|
+ private long curSegmentTxId = FSConstants.INVALID_TXID;
|
|
|
+
|
|
|
// the time of printing the statistics to the log file.
|
|
|
private long lastPrintTime;
|
|
|
|
|
@@ -83,6 +103,10 @@ public class FSEditLog implements NNStorageListener {
|
|
|
|
|
|
// is an automatic sync scheduled?
|
|
|
private volatile boolean isAutoSyncScheduled = false;
|
|
|
+
|
|
|
+ // Used to exit in the event of a failure to sync to all journals. It's a
|
|
|
+ // member variable so it can be swapped out for testing.
|
|
|
+ private Runtime runtime = Runtime.getRuntime();
|
|
|
|
|
|
// these are statistics counters.
|
|
|
private long numTransactions; // number of transactions
|
|
@@ -122,226 +146,90 @@ public class FSEditLog implements NNStorageListener {
|
|
|
FSEditLog(NNStorage storage) {
|
|
|
isSyncRunning = false;
|
|
|
this.storage = storage;
|
|
|
- this.storage.registerListener(this);
|
|
|
metrics = NameNode.getNameNodeMetrics();
|
|
|
lastPrintTime = now();
|
|
|
}
|
|
|
|
|
|
- private File getEditFile(StorageDirectory sd) {
|
|
|
- return storage.getEditFile(sd);
|
|
|
- }
|
|
|
-
|
|
|
- private File getEditNewFile(StorageDirectory sd) {
|
|
|
- return storage.getEditNewFile(sd);
|
|
|
- }
|
|
|
-
|
|
|
- private int getNumEditsDirs() {
|
|
|
- return storage.getNumStorageDirs(NameNodeDirType.EDITS);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized int getNumEditStreams() {
|
|
|
- return editStreams == null ? 0 : editStreams.size();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
- * Return the currently active edit streams.
|
|
|
- * This should be used only by unit tests.
|
|
|
+ * Initialize the list of edit journals
|
|
|
*/
|
|
|
- ArrayList<EditLogOutputStream> getEditStreams() {
|
|
|
- return editStreams;
|
|
|
- }
|
|
|
-
|
|
|
- boolean isOpen() {
|
|
|
- return getNumEditStreams() > 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create empty edit log files.
|
|
|
- * Initialize the output stream for logging.
|
|
|
- *
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- synchronized void open() throws IOException {
|
|
|
- numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
|
|
|
- if (editStreams == null)
|
|
|
- editStreams = new ArrayList<EditLogOutputStream>();
|
|
|
+ synchronized void initJournals() {
|
|
|
+ assert journals.isEmpty();
|
|
|
+ Preconditions.checkState(state == State.UNINITIALIZED,
|
|
|
+ "Bad state: %s", state);
|
|
|
|
|
|
- ArrayList<StorageDirectory> al = null;
|
|
|
- for (Iterator<StorageDirectory> it
|
|
|
- = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
- StorageDirectory sd = it.next();
|
|
|
- File eFile = getEditFile(sd);
|
|
|
- try {
|
|
|
- addNewEditLogStream(eFile);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Unable to open edit log file " + eFile);
|
|
|
- // Remove the directory from list of storage directories
|
|
|
- if(al == null) al = new ArrayList<StorageDirectory>(1);
|
|
|
- al.add(sd);
|
|
|
- }
|
|
|
+ for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
|
|
+ journals.add(new JournalAndStream(new FileJournalManager(sd)));
|
|
|
}
|
|
|
|
|
|
- if (al != null)
|
|
|
- storage.reportErrorsOnDirectories(al);
|
|
|
-
|
|
|
- // If there was an error in every storage dir, each one will have
|
|
|
- // been removed from the list of storage directories.
|
|
|
- if (storage.getNumStorageDirs(NameNodeDirType.EDITS) == 0) {
|
|
|
- throw new IOException(
|
|
|
- "Failed to initialize edits log in any storage directory.");
|
|
|
+ if (journals.isEmpty()) {
|
|
|
+ LOG.error("No edits directories configured!");
|
|
|
}
|
|
|
+
|
|
|
+ state = State.BETWEEN_LOG_SEGMENTS;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- synchronized void addNewEditLogStream(File eFile) throws IOException {
|
|
|
- EditLogOutputStream eStream = new EditLogFileOutputStream(eFile,
|
|
|
- sizeOutputFlushBuffer);
|
|
|
- editStreams.add(eStream);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void createEditLogFile(File name) throws IOException {
|
|
|
- waitForSyncToFinish();
|
|
|
-
|
|
|
- EditLogOutputStream eStream = new EditLogFileOutputStream(name,
|
|
|
- sizeOutputFlushBuffer);
|
|
|
- eStream.create();
|
|
|
- eStream.close();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
- * Shutdown the file store.
|
|
|
+ * Initialize the output stream for logging, opening the first
|
|
|
+ * log segment.
|
|
|
*/
|
|
|
- synchronized void close() {
|
|
|
- waitForSyncToFinish();
|
|
|
- if (editStreams == null || editStreams.isEmpty()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- printStatistics(true);
|
|
|
- numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
|
|
|
-
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
|
|
|
- while(it.hasNext()) {
|
|
|
- EditLogOutputStream eStream = it.next();
|
|
|
- try {
|
|
|
- closeStream(eStream);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("FSEditLog:close - failed to close stream "
|
|
|
- + eStream.getName());
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
- }
|
|
|
- }
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
- editStreams.clear();
|
|
|
- }
|
|
|
+ synchronized void open() throws IOException {
|
|
|
+ Preconditions.checkState(state == State.UNINITIALIZED);
|
|
|
+ initJournals();
|
|
|
|
|
|
- /**
|
|
|
- * Close and remove edit log stream.
|
|
|
- * @param index of the stream
|
|
|
- */
|
|
|
- synchronized private void removeStream(int index) {
|
|
|
- EditLogOutputStream eStream = editStreams.get(index);
|
|
|
- try {
|
|
|
- eStream.close();
|
|
|
- } catch (Exception e) {}
|
|
|
- editStreams.remove(index);
|
|
|
+ startLogSegment(getLastWrittenTxId() + 1, true);
|
|
|
+ assert state == State.IN_SEGMENT : "Bad state: " + state;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * The specified streams have IO errors. Close and remove them.
|
|
|
- */
|
|
|
- synchronized
|
|
|
- void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
|
|
|
- if (errorStreams == null || errorStreams.size() == 0) {
|
|
|
- return; // nothing to do
|
|
|
- }
|
|
|
- ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
|
|
|
- for (EditLogOutputStream e : errorStreams) {
|
|
|
- if (e.getType() == JournalType.FILE) {
|
|
|
- errorDirs.add(getStorageDirectoryForStream(e));
|
|
|
- } else {
|
|
|
- disableStream(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- storage.reportErrorsOnDirectories(errorDirs);
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.error("Problem erroring streams " + ioe);
|
|
|
- }
|
|
|
+
|
|
|
+ synchronized boolean isOpen() {
|
|
|
+ return state == State.IN_SEGMENT;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * get an editStream corresponding to a sd
|
|
|
- * @param es - stream to remove
|
|
|
- * @return the matching stream
|
|
|
+ * Shutdown the file store.
|
|
|
*/
|
|
|
- StorageDirectory getStorage(EditLogOutputStream es) {
|
|
|
- String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
- .getParentFile().getParentFile().getAbsolutePath();
|
|
|
-
|
|
|
- Iterator<StorageDirectory> it = storage.dirIterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- StorageDirectory sd = it.next();
|
|
|
- LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());
|
|
|
- if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
|
|
|
- return sd;
|
|
|
+ synchronized void close() {
|
|
|
+ if (state == State.CLOSED) {
|
|
|
+ LOG.warn("Closing log when already closed", new Exception());
|
|
|
+ return;
|
|
|
}
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * get an editStream corresponding to a sd
|
|
|
- * @param sd
|
|
|
- * @return the matching stream
|
|
|
- */
|
|
|
- synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
|
|
|
- for (EditLogOutputStream es : editStreams) {
|
|
|
- File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
|
|
|
- .getParentFile().getParentFile();
|
|
|
- if (parentStorageDir.getName().equals(sd.getRoot().getName()))
|
|
|
- return es;
|
|
|
+
|
|
|
+ if (state == State.IN_SEGMENT) {
|
|
|
+ assert !journals.isEmpty();
|
|
|
+ waitForSyncToFinish();
|
|
|
+ endCurrentLogSegment(true);
|
|
|
}
|
|
|
- return null;
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * check if edits.new log exists in the specified stoorage directory
|
|
|
- */
|
|
|
- boolean existsNew(StorageDirectory sd) {
|
|
|
- return getEditNewFile(sd).exists();
|
|
|
+ state = State.CLOSED;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Write an operation to the edit log. Do not sync to persistent
|
|
|
* store yet.
|
|
|
*/
|
|
|
- void logEdit(FSEditLogOp op) {
|
|
|
+ void logEdit(final FSEditLogOp op) {
|
|
|
synchronized (this) {
|
|
|
+ assert state != State.CLOSED;
|
|
|
+
|
|
|
// wait if an automatic sync is scheduled
|
|
|
waitIfAutoSyncScheduled();
|
|
|
|
|
|
- if(getNumEditStreams() == 0)
|
|
|
+ if (journals.isEmpty()) {
|
|
|
throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- long start = now();
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
- if(!eStream.isOperationSupported(op.opCode.getOpCode()))
|
|
|
- continue;
|
|
|
- try {
|
|
|
- eStream.write(op);
|
|
|
- } catch (IOException ie) {
|
|
|
- LOG.error("logEdit: removing "+ eStream.getName(), ie);
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
- }
|
|
|
}
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
- recordTransaction(start);
|
|
|
+
|
|
|
+ long start = beginTransaction();
|
|
|
+ op.setTransactionId(txid);
|
|
|
+
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ if (!jas.isActive()) return;
|
|
|
+ jas.stream.write(op);
|
|
|
+ }
|
|
|
+ }, "logging edit");
|
|
|
+
|
|
|
+ endTransaction(start);
|
|
|
|
|
|
// check if it is time to schedule an automatic sync
|
|
|
if (!shouldForceSync()) {
|
|
@@ -384,15 +272,18 @@ public class FSEditLog implements NNStorageListener {
|
|
|
* @return true if any of the edit stream says that it should sync
|
|
|
*/
|
|
|
private boolean shouldForceSync() {
|
|
|
- for (EditLogOutputStream eStream : editStreams) {
|
|
|
- if (eStream.shouldForceSync()) {
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
+
|
|
|
+ if (jas.getCurrentStream().shouldForceSync()) {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- private void recordTransaction(long start) {
|
|
|
+ private long beginTransaction() {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
// get a new transactionId
|
|
|
txid++;
|
|
|
|
|
@@ -401,7 +292,12 @@ public class FSEditLog implements NNStorageListener {
|
|
|
//
|
|
|
TransactionId id = myTransactionId.get();
|
|
|
id.txid = txid;
|
|
|
-
|
|
|
+ return now();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void endTransaction(long start) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+
|
|
|
// update statistics
|
|
|
long end = now();
|
|
|
numTransactions++;
|
|
@@ -410,6 +306,35 @@ public class FSEditLog implements NNStorageListener {
|
|
|
metrics.addTransaction(end-start);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the transaction ID of the last transaction written to the log.
|
|
|
+ */
|
|
|
+ synchronized long getLastWrittenTxId() {
|
|
|
+ return txid;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the first transaction ID in the current log segment
|
|
|
+ */
|
|
|
+ synchronized long getCurSegmentTxId() {
|
|
|
+ Preconditions.checkState(state == State.IN_SEGMENT,
|
|
|
+ "Bad state: %s", state);
|
|
|
+ return curSegmentTxId;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set the transaction ID to use for the next transaction written.
|
|
|
+ */
|
|
|
+ synchronized void setNextTxId(long nextTxId) {
|
|
|
+ Preconditions.checkArgument(synctxid <= txid &&
|
|
|
+ nextTxId >= txid,
|
|
|
+ "May not decrease txid." +
|
|
|
+ " synctxid=%s txid=%s nextTxId=%s",
|
|
|
+ synctxid, txid, nextTxId);
|
|
|
+
|
|
|
+ txid = nextTxId - 1;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Blocks until all ongoing edits have been synced to disk.
|
|
|
* This differs from logSync in that it waits for edits that have been
|
|
@@ -457,12 +382,15 @@ public class FSEditLog implements NNStorageListener {
|
|
|
* waitForSyncToFinish() before assuming they are running alone.
|
|
|
*/
|
|
|
public void logSync() {
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
long syncStart = 0;
|
|
|
|
|
|
// Fetch the transactionId of this thread.
|
|
|
long mytxid = myTransactionId.get().txid;
|
|
|
- ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
|
|
|
+
|
|
|
+ List<JournalAndStream> candidateJournals =
|
|
|
+ Lists.newArrayListWithCapacity(journals.size());
|
|
|
+ List<JournalAndStream> badJournals = Lists.newArrayList();
|
|
|
+
|
|
|
boolean sync = false;
|
|
|
try {
|
|
|
synchronized (this) {
|
|
@@ -493,20 +421,16 @@ public class FSEditLog implements NNStorageListener {
|
|
|
sync = true;
|
|
|
|
|
|
// swap buffers
|
|
|
- assert editStreams.size() > 0 : "no editlog streams";
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
+ assert !journals.isEmpty() : "no editlog streams";
|
|
|
+
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
try {
|
|
|
- eStream.setReadyToFlush();
|
|
|
- streams.add(eStream);
|
|
|
+ jas.getCurrentStream().setReadyToFlush();
|
|
|
+ candidateJournals.add(jas);
|
|
|
} catch (IOException ie) {
|
|
|
LOG.error("Unable to get ready to flush.", ie);
|
|
|
- //
|
|
|
- // remember the streams that encountered an error.
|
|
|
- //
|
|
|
- if (errorStreams == null) {
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- }
|
|
|
- errorStreams.add(eStream);
|
|
|
+ badJournals.add(jas);
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -517,29 +441,36 @@ public class FSEditLog implements NNStorageListener {
|
|
|
|
|
|
// do the sync
|
|
|
long start = now();
|
|
|
- for (EditLogOutputStream eStream : streams) {
|
|
|
+ for (JournalAndStream jas : candidateJournals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
try {
|
|
|
- eStream.flush();
|
|
|
+ jas.getCurrentStream().flush();
|
|
|
} catch (IOException ie) {
|
|
|
LOG.error("Unable to sync edit log.", ie);
|
|
|
//
|
|
|
// remember the streams that encountered an error.
|
|
|
//
|
|
|
- if (errorStreams == null) {
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- }
|
|
|
- errorStreams.add(eStream);
|
|
|
+ badJournals.add(jas);
|
|
|
}
|
|
|
}
|
|
|
long elapsed = now() - start;
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
+ disableAndReportErrorOnJournals(badJournals);
|
|
|
|
|
|
- if (metrics != null) // Metrics non-null only when used inside name node
|
|
|
+ if (metrics != null) { // Metrics non-null only when used inside name node
|
|
|
metrics.addSync(elapsed);
|
|
|
+ }
|
|
|
+
|
|
|
} finally {
|
|
|
// Prevent RuntimeException from blocking other log edit sync
|
|
|
synchronized (this) {
|
|
|
if (sync) {
|
|
|
+ if (badJournals.size() >= journals.size()) {
|
|
|
+ LOG.fatal("Could not sync any journal to persistent storage. " +
|
|
|
+ "Unsynced transactions: " + (txid - synctxid),
|
|
|
+ new Exception());
|
|
|
+ runtime.exit(1);
|
|
|
+ }
|
|
|
+
|
|
|
synctxid = syncStart;
|
|
|
isSyncRunning = false;
|
|
|
}
|
|
@@ -556,7 +487,7 @@ public class FSEditLog implements NNStorageListener {
|
|
|
if (lastPrintTime + 60000 > now && !force) {
|
|
|
return;
|
|
|
}
|
|
|
- if (editStreams == null || editStreams.size()==0) {
|
|
|
+ if (journals.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
lastPrintTime = now;
|
|
@@ -568,12 +499,17 @@ public class FSEditLog implements NNStorageListener {
|
|
|
buf.append("Number of transactions batched in Syncs: ");
|
|
|
buf.append(numTransactionsBatchedInSync);
|
|
|
buf.append(" Number of syncs: ");
|
|
|
- buf.append(editStreams.get(0).getNumSync());
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
+ buf.append(jas.getCurrentStream().getNumSync());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
buf.append(" SyncTimes(ms): ");
|
|
|
|
|
|
- int numEditStreams = editStreams.size();
|
|
|
- for (int idx = 0; idx < numEditStreams; idx++) {
|
|
|
- EditLogOutputStream eStream = editStreams.get(idx);
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
+ EditLogOutputStream eStream = jas.getCurrentStream();
|
|
|
buf.append(eStream.getTotalSyncTime());
|
|
|
buf.append(" ");
|
|
|
}
|
|
@@ -788,202 +724,191 @@ public class FSEditLog implements NNStorageListener {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return the size of the current EditLog
|
|
|
+ * @return the number of active (non-failed) journals
|
|
|
*/
|
|
|
- synchronized long getEditLogSize() throws IOException {
|
|
|
- assert getNumEditsDirs() <= getNumEditStreams() :
|
|
|
- "Number of edits directories should not exceed the number of streams.";
|
|
|
- long size = 0;
|
|
|
- ArrayList<EditLogOutputStream> al = null;
|
|
|
- for (int idx = 0; idx < getNumEditStreams(); idx++) {
|
|
|
- EditLogOutputStream es = editStreams.get(idx);
|
|
|
- try {
|
|
|
- long curSize = es.length();
|
|
|
- assert (size == 0 || size == curSize || curSize ==0) :
|
|
|
- "Wrong streams size";
|
|
|
- size = Math.max(size, curSize);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
|
|
|
- idx + ") " + es.getName());
|
|
|
- if(al==null) al = new ArrayList<EditLogOutputStream>(1);
|
|
|
- al.add(es);
|
|
|
+ private int countActiveJournals() {
|
|
|
+ int count = 0;
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (jas.isActive()) {
|
|
|
+ count++;
|
|
|
}
|
|
|
}
|
|
|
- if(al!=null) disableAndReportErrorOnStreams(al);
|
|
|
- return size;
|
|
|
+ return count;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Closes the current edit log and opens edits.new.
|
|
|
+ * Used only by unit tests.
|
|
|
*/
|
|
|
- synchronized void rollEditLog() throws IOException {
|
|
|
- waitForSyncToFinish();
|
|
|
- Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
|
|
|
- if(!it.hasNext())
|
|
|
- return;
|
|
|
- //
|
|
|
- // If edits.new already exists in some directory, verify it
|
|
|
- // exists in all directories.
|
|
|
- //
|
|
|
- boolean alreadyExists = existsNew(it.next());
|
|
|
- while(it.hasNext()) {
|
|
|
- StorageDirectory sd = it.next();
|
|
|
- if(alreadyExists != existsNew(sd))
|
|
|
- throw new IOException(getEditNewFile(sd)
|
|
|
- + "should " + (alreadyExists ? "" : "not ") + "exist.");
|
|
|
- }
|
|
|
- if(alreadyExists)
|
|
|
- return; // nothing to do, edits.new exists!
|
|
|
-
|
|
|
- // check if any of failed storage is now available and put it back
|
|
|
- storage.attemptRestoreRemovedStorage();
|
|
|
-
|
|
|
- divertFileStreams(
|
|
|
- Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
|
|
|
+ @VisibleForTesting
|
|
|
+ List<JournalAndStream> getJournals() {
|
|
|
+ return journals;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Divert file streams from file edits to file edits.new.<p>
|
|
|
- * Close file streams, which are currently writing into edits files.
|
|
|
- * Create new streams based on file getRoot()/dest.
|
|
|
- * @param dest new stream path relative to the storage directory root.
|
|
|
- * @throws IOException
|
|
|
+ * Used only by unit tests.
|
|
|
*/
|
|
|
- synchronized void divertFileStreams(String dest) throws IOException {
|
|
|
- waitForSyncToFinish();
|
|
|
-
|
|
|
- assert getNumEditStreams() >= getNumEditsDirs() :
|
|
|
- "Inconsistent number of streams";
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- EditStreamIterator itE =
|
|
|
- (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
|
|
|
- Iterator<StorageDirectory> itD =
|
|
|
- storage.dirIterator(NameNodeDirType.EDITS);
|
|
|
- while(itE.hasNext() && itD.hasNext()) {
|
|
|
- EditLogOutputStream eStream = itE.next();
|
|
|
- StorageDirectory sd = itD.next();
|
|
|
- if(!eStream.getName().startsWith(sd.getRoot().getPath()))
|
|
|
- throw new IOException("Inconsistent order of edit streams: " + eStream);
|
|
|
- try {
|
|
|
- // close old stream
|
|
|
- closeStream(eStream);
|
|
|
- // create new stream
|
|
|
- eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
|
|
|
- sizeOutputFlushBuffer);
|
|
|
- eStream.create();
|
|
|
- // replace by the new stream
|
|
|
- itE.replace(eStream);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Error in editStream " + eStream.getName(), e);
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
- }
|
|
|
- }
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
+ @VisibleForTesting
|
|
|
+ synchronized void setRuntimeForTesting(Runtime runtime) {
|
|
|
+ this.runtime = runtime;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return a manifest of what finalized edit logs are available
|
|
|
+ */
|
|
|
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
|
|
|
+ throws IOException {
|
|
|
+ FSImageTransactionalStorageInspector inspector =
|
|
|
+ new FSImageTransactionalStorageInspector();
|
|
|
|
|
|
+ for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
|
|
+ inspector.inspectDirectory(sd);
|
|
|
+ }
|
|
|
+
|
|
|
+ return inspector.getEditLogManifest(sinceTxId);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Removes the old edit log and renames edits.new to edits.
|
|
|
- * Reopens the edits file.
|
|
|
+ * Finalizes the current edit log and opens a new log segment.
|
|
|
+ * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
|
|
|
+ * in the new log.
|
|
|
*/
|
|
|
- synchronized void purgeEditLog() throws IOException {
|
|
|
- waitForSyncToFinish();
|
|
|
- revertFileStreams(
|
|
|
- Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
|
|
|
+ synchronized long rollEditLog() throws IOException {
|
|
|
+ LOG.info("Rolling edit logs.");
|
|
|
+ endCurrentLogSegment(true);
|
|
|
+
|
|
|
+ long nextTxId = getLastWrittenTxId() + 1;
|
|
|
+ startLogSegment(nextTxId, true);
|
|
|
+
|
|
|
+ assert curSegmentTxId == nextTxId;
|
|
|
+ return nextTxId;
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * The actual sync activity happens while not synchronized on this object.
|
|
|
- * Thus, synchronized activities that require that they are not concurrent
|
|
|
- * with file operations should wait for any running sync to finish.
|
|
|
+ * Start writing to the log segment with the given txid.
|
|
|
+ * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state.
|
|
|
*/
|
|
|
- synchronized void waitForSyncToFinish() {
|
|
|
- while (isSyncRunning) {
|
|
|
- try {
|
|
|
- wait(1000);
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ synchronized void startLogSegment(final long segmentTxId,
|
|
|
+ boolean writeHeaderTxn) throws IOException {
|
|
|
+ LOG.info("Starting log segment at " + segmentTxId);
|
|
|
+ Preconditions.checkArgument(segmentTxId > 0,
|
|
|
+ "Bad txid: %s", segmentTxId);
|
|
|
+ Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
|
|
|
+ "Bad state: %s", state);
|
|
|
+ Preconditions.checkState(segmentTxId > curSegmentTxId,
|
|
|
+ "Cannot start writing to log segment " + segmentTxId +
|
|
|
+ " when previous log segment started at " + curSegmentTxId);
|
|
|
+ Preconditions.checkArgument(segmentTxId == txid + 1,
|
|
|
+ "Cannot start log segment at txid %s when next expected " +
|
|
|
+ "txid is %s", segmentTxId, txid + 1);
|
|
|
+
|
|
|
+ numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
|
|
|
+
|
|
|
+ // TODO no need to link this back to storage anymore!
|
|
|
+ // See HDFS-2174.
|
|
|
+ storage.attemptRestoreRemovedStorage();
|
|
|
+
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ jas.startLogSegment(segmentTxId);
|
|
|
+ }
|
|
|
+ }, "starting log segment " + segmentTxId);
|
|
|
+
|
|
|
+ if (countActiveJournals() == 0) {
|
|
|
+ throw new IOException("Unable to start log segment " +
|
|
|
+ segmentTxId + ": no journals successfully started.");
|
|
|
+ }
|
|
|
+
|
|
|
+ curSegmentTxId = segmentTxId;
|
|
|
+ state = State.IN_SEGMENT;
|
|
|
+
|
|
|
+ if (writeHeaderTxn) {
|
|
|
+ logEdit(LogSegmentOp.getInstance(
|
|
|
+ FSEditLogOpCodes.OP_START_LOG_SEGMENT));
|
|
|
+ logSync();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Revert file streams from file edits.new back to file edits.<p>
|
|
|
- * Close file streams, which are currently writing into getRoot()/source.
|
|
|
- * Rename getRoot()/source to edits.
|
|
|
- * Reopen streams so that they start writing into edits files.
|
|
|
- * @param dest new stream path relative to the storage directory root.
|
|
|
- * @throws IOException
|
|
|
+ * Finalize the current log segment.
|
|
|
+ * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
|
|
|
*/
|
|
|
- synchronized void revertFileStreams(String source) throws IOException {
|
|
|
- waitForSyncToFinish();
|
|
|
-
|
|
|
- assert getNumEditStreams() >= getNumEditsDirs() :
|
|
|
- "Inconsistent number of streams";
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- EditStreamIterator itE =
|
|
|
- (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
|
|
|
- Iterator<StorageDirectory> itD =
|
|
|
- storage.dirIterator(NameNodeDirType.EDITS);
|
|
|
- while(itE.hasNext() && itD.hasNext()) {
|
|
|
- EditLogOutputStream eStream = itE.next();
|
|
|
- StorageDirectory sd = itD.next();
|
|
|
- if(!eStream.getName().startsWith(sd.getRoot().getPath()))
|
|
|
- throw new IOException("Inconsistent order of edit streams: " + eStream +
|
|
|
- " does not start with " + sd.getRoot().getPath());
|
|
|
- try {
|
|
|
- // close old stream
|
|
|
- closeStream(eStream);
|
|
|
- // rename edits.new to edits
|
|
|
- File editFile = getEditFile(sd);
|
|
|
- File prevEditFile = new File(sd.getRoot(), source);
|
|
|
- if(prevEditFile.exists()) {
|
|
|
- if(!prevEditFile.renameTo(editFile)) {
|
|
|
- //
|
|
|
- // renameTo() fails on Windows if the destination
|
|
|
- // file exists.
|
|
|
- //
|
|
|
- if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
|
|
|
- throw new IOException("Rename failed for " + sd.getRoot());
|
|
|
- }
|
|
|
- }
|
|
|
+ synchronized void endCurrentLogSegment(boolean writeEndTxn) {
|
|
|
+ LOG.info("Ending log segment " + curSegmentTxId);
|
|
|
+ Preconditions.checkState(state == State.IN_SEGMENT,
|
|
|
+ "Bad state: %s", state);
|
|
|
+
|
|
|
+ if (writeEndTxn) {
|
|
|
+ logEdit(LogSegmentOp.getInstance(
|
|
|
+ FSEditLogOpCodes.OP_END_LOG_SEGMENT));
|
|
|
+ logSync();
|
|
|
+ }
|
|
|
+
|
|
|
+ printStatistics(true);
|
|
|
+
|
|
|
+ final long lastTxId = getLastWrittenTxId();
|
|
|
+
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ if (jas.isActive()) {
|
|
|
+ jas.close(lastTxId);
|
|
|
}
|
|
|
- // open new stream
|
|
|
- eStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
|
|
|
- // replace by the new stream
|
|
|
- itE.replace(eStream);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Error in editStream " + eStream.getName(), e);
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
}
|
|
|
- }
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
+ }, "ending log segment");
|
|
|
+
|
|
|
+ state = State.BETWEEN_LOG_SEGMENTS;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Abort all current logs. Called from the backup node.
|
|
|
+ */
|
|
|
+ synchronized void abortCurrentLogSegment() {
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ jas.abort();
|
|
|
+ }
|
|
|
+ }, "aborting all streams");
|
|
|
+ state = State.BETWEEN_LOG_SEGMENTS;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return the name of the edit file
|
|
|
+ * Archive any log files that are older than the given txid.
|
|
|
*/
|
|
|
- synchronized File getFsEditName() {
|
|
|
- StorageDirectory sd = null;
|
|
|
- for (Iterator<StorageDirectory> it =
|
|
|
- storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
- sd = it.next();
|
|
|
- if(sd.getRoot().canRead())
|
|
|
- return getEditFile(sd);
|
|
|
+ public void purgeLogsOlderThan(
|
|
|
+ final long minTxIdToKeep, final StoragePurger purger) {
|
|
|
+ synchronized (this) {
|
|
|
+ // synchronized to prevent findbugs warning about inconsistent
|
|
|
+ // synchronization. This will be JIT-ed out if asserts are
|
|
|
+ // off.
|
|
|
+ assert curSegmentTxId == FSConstants.INVALID_TXID || // on format this is no-op
|
|
|
+ minTxIdToKeep <= curSegmentTxId :
|
|
|
+ "cannot purge logs older than txid " + minTxIdToKeep +
|
|
|
+ " when current segment starts at " + curSegmentTxId;
|
|
|
}
|
|
|
- return null;
|
|
|
+
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ jas.manager.purgeLogsOlderThan(minTxIdToKeep, purger);
|
|
|
+ }
|
|
|
+ }, "purging logs older than " + minTxIdToKeep);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
- * Returns the timestamp of the edit log
|
|
|
+ * The actual sync activity happens while not synchronized on this object.
|
|
|
+ * Thus, synchronized activities that require that they are not concurrent
|
|
|
+ * with file operations should wait for any running sync to finish.
|
|
|
*/
|
|
|
- synchronized long getFsEditTime() {
|
|
|
- Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
|
|
|
- if(it.hasNext())
|
|
|
- return getEditFile(it.next()).lastModified();
|
|
|
- return 0;
|
|
|
+ synchronized void waitForSyncToFinish() {
|
|
|
+ while (isSyncRunning) {
|
|
|
+ try {
|
|
|
+ wait(1000);
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -996,273 +921,224 @@ public class FSEditLog implements NNStorageListener {
|
|
|
|
|
|
|
|
|
// sets the initial capacity of the flush buffer.
|
|
|
- public void setBufferCapacity(int size) {
|
|
|
- sizeOutputFlushBuffer = size;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- boolean isEmpty() throws IOException {
|
|
|
- return getEditLogSize() <= 0;
|
|
|
+ public void setOutputBufferCapacity(int size) {
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ jas.manager.setOutputBufferCapacity(size);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Create (or find if already exists) an edit output stream, which
|
|
|
* streams journal records (edits) to the specified backup node.<br>
|
|
|
- * Send a record, prescribing to start journal spool.<br>
|
|
|
- * This should be sent via regular stream of journal records so that
|
|
|
- * the backup node new exactly after which record it should start spooling.
|
|
|
+ *
|
|
|
+ * The new BackupNode will start receiving edits the next time this
|
|
|
+ * NameNode's logs roll.
|
|
|
*
|
|
|
* @param bnReg the backup node registration information.
|
|
|
* @param nnReg this (active) name-node registration.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
|
|
|
- NamenodeRegistration nnReg) // active name-node
|
|
|
+ synchronized void registerBackupNode(
|
|
|
+ NamenodeRegistration bnReg, // backup node
|
|
|
+ NamenodeRegistration nnReg) // active name-node
|
|
|
throws IOException {
|
|
|
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
|
|
|
return; // checkpoint node does not stream edits
|
|
|
- if(editStreams == null)
|
|
|
- editStreams = new ArrayList<EditLogOutputStream>();
|
|
|
- EditLogOutputStream boStream = null;
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
- if(eStream.getName().equals(bnReg.getAddress())) {
|
|
|
- boStream = eStream; // already there
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if(boStream == null) {
|
|
|
- boStream = new EditLogBackupOutputStream(bnReg, nnReg);
|
|
|
- editStreams.add(boStream);
|
|
|
+
|
|
|
+ JournalAndStream jas = findBackupJournalAndStream(bnReg);
|
|
|
+ if (jas != null) {
|
|
|
+ // already registered
|
|
|
+ LOG.info("Backup node " + bnReg + " re-registers");
|
|
|
+ return;
|
|
|
}
|
|
|
- logEdit(JSpoolStartOp.getInstance());
|
|
|
+
|
|
|
+ LOG.info("Registering new backup node: " + bnReg);
|
|
|
+ BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
|
|
|
+ journals.add(new JournalAndStream(bjm));
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Write an operation to the edit log. Do not sync to persistent
|
|
|
- * store yet.
|
|
|
- */
|
|
|
- synchronized void logEdit(int length, byte[] data) {
|
|
|
- if(getNumEditStreams() == 0)
|
|
|
- throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- long start = now();
|
|
|
- for(EditLogOutputStream eStream : editStreams) {
|
|
|
- try {
|
|
|
- eStream.writeRaw(data, 0, length);
|
|
|
- } catch (IOException ie) {
|
|
|
- LOG.warn("Error in editStream " + eStream.getName(), ie);
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
+
|
|
|
+ synchronized void releaseBackupStream(NamenodeRegistration registration) {
|
|
|
+ for (Iterator<JournalAndStream> iter = journals.iterator();
|
|
|
+ iter.hasNext();) {
|
|
|
+ JournalAndStream jas = iter.next();
|
|
|
+ if (jas.manager instanceof BackupJournalManager &&
|
|
|
+ ((BackupJournalManager)jas.manager).matchesRegistration(
|
|
|
+ registration)) {
|
|
|
+ jas.abort();
|
|
|
+ LOG.info("Removing backup journal " + jas);
|
|
|
+ iter.remove();
|
|
|
}
|
|
|
}
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
- recordTransaction(start);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Iterates output streams based of the same type.
|
|
|
- * Type null will iterate over all streams.
|
|
|
+ * Find the JournalAndStream associated with this BackupNode.
|
|
|
+ * @return null if it cannot be found
|
|
|
*/
|
|
|
- private class EditStreamIterator implements Iterator<EditLogOutputStream> {
|
|
|
- JournalType type;
|
|
|
- int prevIndex; // for remove()
|
|
|
- int nextIndex; // for next()
|
|
|
-
|
|
|
- EditStreamIterator(JournalType streamType) {
|
|
|
- this.type = streamType;
|
|
|
- this.nextIndex = 0;
|
|
|
- this.prevIndex = 0;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean hasNext() {
|
|
|
- synchronized(FSEditLog.this) {
|
|
|
- if(editStreams == null ||
|
|
|
- editStreams.isEmpty() || nextIndex >= editStreams.size())
|
|
|
- return false;
|
|
|
- while(nextIndex < editStreams.size()
|
|
|
- && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
- nextIndex++;
|
|
|
- return nextIndex < editStreams.size();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public EditLogOutputStream next() {
|
|
|
- EditLogOutputStream stream = null;
|
|
|
- synchronized(FSEditLog.this) {
|
|
|
- stream = editStreams.get(nextIndex);
|
|
|
- prevIndex = nextIndex;
|
|
|
- nextIndex++;
|
|
|
- while(nextIndex < editStreams.size()
|
|
|
- && !editStreams.get(nextIndex).getType().isOfType(type))
|
|
|
- nextIndex++;
|
|
|
- }
|
|
|
- return stream;
|
|
|
- }
|
|
|
-
|
|
|
- public void remove() {
|
|
|
- nextIndex = prevIndex; // restore previous state
|
|
|
- removeStream(prevIndex); // remove last returned element
|
|
|
- hasNext(); // reset nextIndex to correct place
|
|
|
- }
|
|
|
-
|
|
|
- void replace(EditLogOutputStream newStream) {
|
|
|
- synchronized (FSEditLog.this) {
|
|
|
- assert 0 <= prevIndex && prevIndex < editStreams.size() :
|
|
|
- "Index out of bound.";
|
|
|
- editStreams.set(prevIndex, newStream);
|
|
|
+ private synchronized JournalAndStream findBackupJournalAndStream(
|
|
|
+ NamenodeRegistration bnReg) {
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (jas.manager instanceof BackupJournalManager) {
|
|
|
+ BackupJournalManager bjm = (BackupJournalManager)jas.manager;
|
|
|
+ if (bjm.matchesRegistration(bnReg)) {
|
|
|
+ return jas;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get stream iterator for the specified type.
|
|
|
- */
|
|
|
- public Iterator<EditLogOutputStream>
|
|
|
- getOutputStreamIterator(JournalType streamType) {
|
|
|
- return new EditStreamIterator(streamType);
|
|
|
- }
|
|
|
+ * Write an operation to the edit log. Do not sync to persistent
|
|
|
+ * store yet.
|
|
|
+ */
|
|
|
+ synchronized void logEdit(final int length, final byte[] data) {
|
|
|
+ long start = beginTransaction();
|
|
|
+
|
|
|
+ mapJournalsAndReportErrors(new JournalClosure() {
|
|
|
+ @Override
|
|
|
+ public void apply(JournalAndStream jas) throws IOException {
|
|
|
+ if (jas.isActive()) {
|
|
|
+ jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, "Logging edit");
|
|
|
|
|
|
- private void closeStream(EditLogOutputStream eStream) throws IOException {
|
|
|
- eStream.setReadyToFlush();
|
|
|
- eStream.flush();
|
|
|
- eStream.close();
|
|
|
+ endTransaction(start);
|
|
|
}
|
|
|
|
|
|
- void incrementCheckpointTime() {
|
|
|
- storage.incrementCheckpointTime();
|
|
|
- CheckpointTimeOp op = CheckpointTimeOp.getInstance()
|
|
|
- .setCheckpointTime(storage.getCheckpointTime());
|
|
|
- logEdit(op);
|
|
|
+ //// Iteration across journals
|
|
|
+ private interface JournalClosure {
|
|
|
+ public void apply(JournalAndStream jas) throws IOException;
|
|
|
}
|
|
|
|
|
|
- synchronized void releaseBackupStream(NamenodeRegistration registration) {
|
|
|
- Iterator<EditLogOutputStream> it =
|
|
|
- getOutputStreamIterator(JournalType.BACKUP);
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- NamenodeRegistration backupNode = null;
|
|
|
- while(it.hasNext()) {
|
|
|
- EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
|
|
|
- backupNode = eStream.getRegistration();
|
|
|
- if(backupNode.getAddress().equals(registration.getAddress()) &&
|
|
|
- backupNode.isRole(registration.getRole())) {
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
- break;
|
|
|
+ /**
|
|
|
+ * Apply the given function across all of the journal managers, disabling
|
|
|
+ * any for which the closure throws an IOException.
|
|
|
+ * @param status message used for logging errors (e.g. "opening journal")
|
|
|
+ */
|
|
|
+ private void mapJournalsAndReportErrors(
|
|
|
+ JournalClosure closure, String status) {
|
|
|
+ List<JournalAndStream> badJAS = Lists.newLinkedList();
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ try {
|
|
|
+ closure.apply(jas);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error " + status + " (journal " + jas + ")", t);
|
|
|
+ badJAS.add(jas);
|
|
|
}
|
|
|
}
|
|
|
- assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
|
|
|
- "Not a backup node corresponds to a backup stream";
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
- }
|
|
|
|
|
|
- synchronized boolean checkBackupRegistration(
|
|
|
- NamenodeRegistration registration) {
|
|
|
- Iterator<EditLogOutputStream> it =
|
|
|
- getOutputStreamIterator(JournalType.BACKUP);
|
|
|
- boolean regAllowed = !it.hasNext();
|
|
|
- NamenodeRegistration backupNode = null;
|
|
|
- ArrayList<EditLogOutputStream> errorStreams = null;
|
|
|
- while(it.hasNext()) {
|
|
|
- EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
|
|
|
- backupNode = eStream.getRegistration();
|
|
|
- if(backupNode.getAddress().equals(registration.getAddress()) &&
|
|
|
- backupNode.isRole(registration.getRole())) {
|
|
|
- regAllowed = true; // same node re-registers
|
|
|
- break;
|
|
|
- }
|
|
|
- if(!eStream.isAlive()) {
|
|
|
- if(errorStreams == null)
|
|
|
- errorStreams = new ArrayList<EditLogOutputStream>(1);
|
|
|
- errorStreams.add(eStream);
|
|
|
- regAllowed = true; // previous backup node failed
|
|
|
- }
|
|
|
- }
|
|
|
- assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
|
|
|
- "Not a backup node corresponds to a backup stream";
|
|
|
- disableAndReportErrorOnStreams(errorStreams);
|
|
|
- return regAllowed;
|
|
|
+ disableAndReportErrorOnJournals(badJAS);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * Get the StorageDirectory for a stream
|
|
|
- * @param es Stream whose StorageDirectory we wish to know
|
|
|
- * @return the matching StorageDirectory
|
|
|
+ * Called when some journals experience an error in some operation.
|
|
|
+ * This propagates errors to the storage level.
|
|
|
*/
|
|
|
- StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
|
|
|
- String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
|
|
|
-
|
|
|
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
|
|
- StorageDirectory sd = it.next();
|
|
|
- FSNamesystem.LOG.info("comparing: " + parentStorageDir
|
|
|
- + " and " + sd.getRoot().getAbsolutePath());
|
|
|
- if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
|
|
|
- return sd;
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void disableStream(EditLogOutputStream stream) {
|
|
|
- try { stream.close(); } catch (IOException e) {
|
|
|
- // nothing to do.
|
|
|
- LOG.warn("Failed to close eStream " + stream.getName()
|
|
|
- + " before removing it (might be ok)");
|
|
|
+ private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
|
|
|
+ if (badJournals == null || badJournals.isEmpty()) {
|
|
|
+ return; // nothing to do
|
|
|
}
|
|
|
- editStreams.remove(stream);
|
|
|
-
|
|
|
- if (editStreams.size() <= 0) {
|
|
|
- String msg = "Fatal Error: All storage directories are inaccessible.";
|
|
|
- LOG.fatal(msg, new IOException(msg));
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
+
|
|
|
+ for (JournalAndStream j : badJournals) {
|
|
|
+ LOG.error("Disabling journal " + j);
|
|
|
+ j.abort();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Error Handling on a storageDirectory
|
|
|
- *
|
|
|
+ * Container for a JournalManager paired with its currently
|
|
|
+ * active stream.
|
|
|
+ *
|
|
|
+ * If a Journal gets disabled due to an error writing to its
|
|
|
+ * stream, then the stream will be aborted and set to null.
|
|
|
*/
|
|
|
- // NNStorageListener Interface
|
|
|
- @Override // NNStorageListener
|
|
|
- public synchronized void errorOccurred(StorageDirectory sd)
|
|
|
- throws IOException {
|
|
|
- if (editStreams == null) {
|
|
|
- //errors can occur on storage directories
|
|
|
- //before edit streams have been set up
|
|
|
- return;
|
|
|
+ static class JournalAndStream {
|
|
|
+ private final JournalManager manager;
|
|
|
+ private EditLogOutputStream stream;
|
|
|
+ private long segmentStartsAtTxId = FSConstants.INVALID_TXID;
|
|
|
+
|
|
|
+ private JournalAndStream(JournalManager manager) {
|
|
|
+ this.manager = manager;
|
|
|
}
|
|
|
- ArrayList<EditLogOutputStream> errorStreams
|
|
|
- = new ArrayList<EditLogOutputStream>();
|
|
|
|
|
|
- for (EditLogOutputStream eStream : editStreams) {
|
|
|
- LOG.error("Unable to log edits to " + eStream.getName()
|
|
|
- + "; removing it");
|
|
|
+ private void startLogSegment(long txId) throws IOException {
|
|
|
+ Preconditions.checkState(stream == null);
|
|
|
+ stream = manager.startLogSegment(txId);
|
|
|
+ segmentStartsAtTxId = txId;
|
|
|
+ }
|
|
|
|
|
|
- StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
|
|
|
- if (sd == streamStorageDir) {
|
|
|
- errorStreams.add(eStream);
|
|
|
+ private void close(long lastTxId) throws IOException {
|
|
|
+ Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
|
|
|
+ "invalid segment: lastTxId %s >= " +
|
|
|
+ "segment starting txid %s", lastTxId, segmentStartsAtTxId);
|
|
|
+
|
|
|
+ if (stream == null) return;
|
|
|
+ stream.close();
|
|
|
+ manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
|
|
|
+ stream = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void abort() {
|
|
|
+ if (stream == null) return;
|
|
|
+ try {
|
|
|
+ stream.abort();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.error("Unable to abort stream " + stream, ioe);
|
|
|
}
|
|
|
+ stream = null;
|
|
|
+ segmentStartsAtTxId = FSConstants.INVALID_TXID;
|
|
|
}
|
|
|
|
|
|
- for (EditLogOutputStream eStream : errorStreams) {
|
|
|
- disableStream(eStream);
|
|
|
+ private boolean isActive() {
|
|
|
+ return stream != null;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Override // NNStorageListener
|
|
|
- public synchronized void formatOccurred(StorageDirectory sd)
|
|
|
- throws IOException {
|
|
|
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
|
|
|
- createEditLogFile(NNStorage.getStorageFile(sd, NameNodeFile.EDITS));
|
|
|
+ @VisibleForTesting
|
|
|
+ EditLogOutputStream getCurrentStream() {
|
|
|
+ return stream;
|
|
|
}
|
|
|
- };
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "JournalAndStream(mgr=" + manager +
|
|
|
+ ", " + "stream=" + stream + ")";
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void setCurrentStreamForTests(EditLogOutputStream stream) {
|
|
|
+ this.stream = stream;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ JournalManager getManager() {
|
|
|
+ return manager;
|
|
|
+ }
|
|
|
+
|
|
|
+ private EditLogInputStream getInProgressInputStream() throws IOException {
|
|
|
+ return manager.getInProgressInputStream(segmentStartsAtTxId);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- @Override // NNStorageListener
|
|
|
- public synchronized void directoryAvailable(StorageDirectory sd)
|
|
|
+ /**
|
|
|
+ * @return an EditLogInputStream that reads from the same log that
|
|
|
+ * the edit log is currently writing. This is used from the BackupNode
|
|
|
+ * during edits synchronization.
|
|
|
+ * @throws IOException if no valid logs are available.
|
|
|
+ */
|
|
|
+ synchronized EditLogInputStream getInProgressFileInputStream()
|
|
|
throws IOException {
|
|
|
- if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
|
|
|
- File eFile = getEditFile(sd);
|
|
|
- addNewEditLogStream(eFile);
|
|
|
+ for (JournalAndStream jas : journals) {
|
|
|
+ if (!jas.isActive()) continue;
|
|
|
+ try {
|
|
|
+ EditLogInputStream in = jas.getInProgressInputStream();
|
|
|
+ if (in != null) return in;
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Unable to get the in-progress input stream from " + jas,
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
}
|
|
|
+ throw new IOException("No in-progress stream provided edits");
|
|
|
}
|
|
|
}
|