|
@@ -21,11 +21,13 @@ import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -41,8 +43,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.HAUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
@@ -216,13 +218,18 @@ public class FSImage implements Closeable {
|
|
|
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
|
|
|
}
|
|
|
if (startOpt != StartupOption.UPGRADE
|
|
|
+ && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
|
|
|
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
|
|
|
- && layoutVersion != HdfsConstants.LAYOUT_VERSION) {
|
|
|
+ && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
|
|
|
throw new IOException(
|
|
|
"\nFile system image contains an old layout version "
|
|
|
+ storage.getLayoutVersion() + ".\nAn upgrade to version "
|
|
|
- + HdfsConstants.LAYOUT_VERSION + " is required.\n"
|
|
|
- + "Please restart NameNode with -upgrade option.");
|
|
|
+ + HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
|
|
|
+ + "Please restart NameNode with the \""
|
|
|
+ + RollingUpgradeStartupOption.STARTED.getOptionString()
|
|
|
+ + "\" option if a rolling upgraded is already started;"
|
|
|
+ + " or restart NameNode with the \""
|
|
|
+ + StartupOption.UPGRADE + "\" to start a new upgrade.");
|
|
|
}
|
|
|
|
|
|
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
|
|
@@ -261,7 +268,7 @@ public class FSImage implements Closeable {
|
|
|
// just load the image
|
|
|
}
|
|
|
|
|
|
- return loadFSImage(target, recovery, startOpt);
|
|
|
+ return loadFSImage(target, startOpt, recovery);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -315,8 +322,9 @@ public class FSImage implements Closeable {
|
|
|
return isFormatted;
|
|
|
}
|
|
|
|
|
|
- void doUpgrade(FSNamesystem target) throws IOException {
|
|
|
- // Upgrade is allowed only if there are
|
|
|
+ /** Check if upgrade is in progress. */
|
|
|
+ void checkUpgrade(FSNamesystem target) throws IOException {
|
|
|
+ // Upgrade or rolling upgrade is allowed only if there are
|
|
|
// no previous fs states in any of the local directories
|
|
|
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
@@ -325,15 +333,37 @@ public class FSImage implements Closeable {
|
|
|
"previous fs state should not exist during upgrade. "
|
|
|
+ "Finalize or rollback first.");
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if there is rollback fsimage (for rolling upgrade) in NameNode
|
|
|
+ * directory.
|
|
|
+ */
|
|
|
+ public boolean hasRollbackFSImage() throws IOException {
|
|
|
+ final FSImageStorageInspector inspector = new FSImageTransactionalStorageInspector(
|
|
|
+ EnumSet.of(NameNodeFile.IMAGE_ROLLBACK));
|
|
|
+ storage.inspectStorageDirs(inspector);
|
|
|
+ try {
|
|
|
+ List<FSImageFile> images = inspector.getLatestImages();
|
|
|
+ return images != null && !images.isEmpty();
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void doUpgrade(FSNamesystem target) throws IOException {
|
|
|
+ checkUpgrade(target);
|
|
|
|
|
|
// load the latest image
|
|
|
+
|
|
|
// Do upgrade for each directory
|
|
|
- this.loadFSImage(target, null, StartupOption.UPGRADE);
|
|
|
+ this.loadFSImage(target, StartupOption.UPGRADE, null);
|
|
|
+ target.checkRollingUpgrade("upgrade namenode");
|
|
|
|
|
|
long oldCTime = storage.getCTime();
|
|
|
storage.cTime = now(); // generate new cTime for the state
|
|
|
int oldLV = storage.getLayoutVersion();
|
|
|
- storage.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
+ storage.layoutVersion = HdfsConstants.NAMENODE_LAYOUT_VERSION;
|
|
|
|
|
|
List<StorageDirectory> errorSDs =
|
|
|
Collections.synchronizedList(new ArrayList<StorageDirectory>());
|
|
@@ -393,11 +423,11 @@ public class FSImage implements Closeable {
|
|
|
boolean canRollback = false;
|
|
|
FSImage prevState = new FSImage(conf);
|
|
|
try {
|
|
|
- prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
+ prevState.getStorage().layoutVersion = HdfsConstants.NAMENODE_LAYOUT_VERSION;
|
|
|
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
|
if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
|
|
|
- HdfsConstants.LAYOUT_VERSION)) {
|
|
|
+ HdfsConstants.NAMENODE_LAYOUT_VERSION)) {
|
|
|
continue;
|
|
|
}
|
|
|
canRollback = true;
|
|
@@ -407,7 +437,7 @@ public class FSImage implements Closeable {
|
|
|
// If HA is enabled, check if the shared log can be rolled back as well.
|
|
|
editLog.initJournalsForWrite();
|
|
|
canRollback |= editLog.canRollBackSharedLog(prevState.getStorage(),
|
|
|
- HdfsConstants.LAYOUT_VERSION);
|
|
|
+ HdfsConstants.NAMENODE_LAYOUT_VERSION);
|
|
|
}
|
|
|
|
|
|
if (!canRollback)
|
|
@@ -540,13 +570,22 @@ public class FSImage implements Closeable {
|
|
|
* @return whether the image should be saved
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery,
|
|
|
- StartupOption startOpt) throws IOException {
|
|
|
- FSImageStorageInspector inspector = storage.readAndInspectDirs();
|
|
|
- FSImageFile imageFile = null;
|
|
|
-
|
|
|
+ private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
|
|
|
+ MetaRecoveryContext recovery)
|
|
|
+ throws IOException {
|
|
|
+ final boolean rollingRollback
|
|
|
+ = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
|
|
|
+ final EnumSet<NameNodeFile> nnfs;
|
|
|
+ if (rollingRollback) {
|
|
|
+ // if it is rollback of rolling upgrade, only load from the rollback image
|
|
|
+ nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
|
|
|
+ } else {
|
|
|
+ // otherwise we can load from both IMAGE and IMAGE_ROLLBACK
|
|
|
+ nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
|
|
|
+ }
|
|
|
+ final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnfs);
|
|
|
+
|
|
|
isUpgradeFinalized = inspector.isUpgradeFinalized();
|
|
|
-
|
|
|
List<FSImageFile> imageFiles = inspector.getLatestImages();
|
|
|
|
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
@@ -560,12 +599,22 @@ public class FSImage implements Closeable {
|
|
|
|
|
|
initEditLog(startOpt);
|
|
|
|
|
|
- if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
|
|
- getLayoutVersion())) {
|
|
|
+ if (NameNodeLayoutVersion.supports(
|
|
|
+ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
|
|
|
// If we're open for write, we're either non-HA or we're the active NN, so
|
|
|
// we better be able to load all the edits. If we're the standby NN, it's
|
|
|
// OK to not be able to read all of edits right now.
|
|
|
- long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
|
|
|
+ // In the meanwhile, for HA upgrade, we will still write editlog thus need
|
|
|
+ // this toAtLeastTxId to be set to the max-seen txid
|
|
|
+ // For rollback in rolling upgrade, we need to set the toAtLeastTxId to
|
|
|
+ // the txid right before the upgrade marker.
|
|
|
+ long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
|
|
|
+ .getMaxSeenTxId() : 0;
|
|
|
+ if (rollingRollback) {
|
|
|
+ // note that the first image in imageFiles is the special checkpoint
|
|
|
+ // for the rolling upgrade
|
|
|
+ toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
|
|
|
+ }
|
|
|
editStreams = editLog.selectInputStreams(
|
|
|
imageFiles.get(0).getCheckpointTxId() + 1,
|
|
|
toAtLeastTxId, recovery, false);
|
|
@@ -573,8 +622,7 @@ public class FSImage implements Closeable {
|
|
|
editStreams = FSImagePreTransactionalStorageInspector
|
|
|
.getEditLogStreams(storage);
|
|
|
}
|
|
|
- int maxOpSize = conf.getInt(DFSConfigKeys.
|
|
|
- DFS_NAMENODE_MAX_OP_SIZE_KEY,
|
|
|
+ int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
|
|
|
for (EditLogInputStream elis : editStreams) {
|
|
|
elis.setMaxOpSize(maxOpSize);
|
|
@@ -587,6 +635,7 @@ public class FSImage implements Closeable {
|
|
|
LOG.info("No edit log streams selected.");
|
|
|
}
|
|
|
|
|
|
+ FSImageFile imageFile = null;
|
|
|
for (int i = 0; i < imageFiles.size(); i++) {
|
|
|
try {
|
|
|
imageFile = imageFiles.get(i);
|
|
@@ -604,26 +653,57 @@ public class FSImage implements Closeable {
|
|
|
throw new IOException("Failed to load an FSImage file!");
|
|
|
}
|
|
|
prog.endPhase(Phase.LOADING_FSIMAGE);
|
|
|
- long txnsAdvanced = loadEdits(editStreams, target, recovery);
|
|
|
- needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
|
|
- txnsAdvanced);
|
|
|
+
|
|
|
+ if (!rollingRollback) {
|
|
|
+ long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
|
|
|
+ needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
|
|
+ txnsAdvanced);
|
|
|
+ if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
|
|
|
+ // rename rollback image if it is downgrade
|
|
|
+ renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
|
|
|
+ // to the last txid in rollback fsimage.
|
|
|
+ rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
|
|
|
+ needToSave = false;
|
|
|
+ }
|
|
|
editLog.setNextTxId(lastAppliedTxId + 1);
|
|
|
return needToSave;
|
|
|
}
|
|
|
|
|
|
+ /** rollback for rolling upgrade. */
|
|
|
+ private void rollingRollback(long discardSegmentTxId, long ckptId)
|
|
|
+ throws IOException {
|
|
|
+ // discard discard unnecessary editlog segments starting from the given id
|
|
|
+ this.editLog.discardSegments(discardSegmentTxId);
|
|
|
+ // rename the special checkpoint
|
|
|
+ renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,
|
|
|
+ true);
|
|
|
+ // purge all the checkpoints after the marker
|
|
|
+ archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
|
|
|
+ String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
|
|
+ if (HAUtil.isHAEnabled(conf, nameserviceId)) {
|
|
|
+ // close the editlog since it is currently open for write
|
|
|
+ this.editLog.close();
|
|
|
+ // reopen the editlog for read
|
|
|
+ this.editLog.initSharedJournalsForRead();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
|
|
|
FSImageFile imageFile) throws IOException {
|
|
|
LOG.debug("Planning to load image :\n" + imageFile);
|
|
|
StorageDirectory sdForProperties = imageFile.sd;
|
|
|
storage.readProperties(sdForProperties);
|
|
|
|
|
|
- if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
|
|
- getLayoutVersion())) {
|
|
|
+ if (NameNodeLayoutVersion.supports(
|
|
|
+ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
|
|
|
// For txid-based layout, we should have a .md5 file
|
|
|
// next to the image file
|
|
|
loadFSImage(imageFile.getFile(), target, recovery);
|
|
|
- } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
|
|
|
- getLayoutVersion())) {
|
|
|
+ } else if (NameNodeLayoutVersion.supports(
|
|
|
+ LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
|
|
|
// In 0.22, we have the checksum stored in the VERSION file.
|
|
|
String md5 = storage.getDeprecatedProperty(
|
|
|
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
|
|
@@ -648,18 +728,21 @@ public class FSImage implements Closeable {
|
|
|
// If this NN is not HA
|
|
|
editLog.initJournalsForWrite();
|
|
|
editLog.recoverUnclosedStreams();
|
|
|
- } else if (HAUtil.isHAEnabled(conf, nameserviceId) &&
|
|
|
- startOpt == StartupOption.UPGRADE) {
|
|
|
- // This NN is HA, but we're doing an upgrade so init the edit log for
|
|
|
- // write.
|
|
|
+ } else if (HAUtil.isHAEnabled(conf, nameserviceId)
|
|
|
+ && (startOpt == StartupOption.UPGRADE
|
|
|
+ || RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
|
|
|
+ // This NN is HA, but we're doing an upgrade or a rollback of rolling
|
|
|
+ // upgrade so init the edit log for write.
|
|
|
editLog.initJournalsForWrite();
|
|
|
- long sharedLogCTime = editLog.getSharedLogCTime();
|
|
|
- if (this.storage.getCTime() < sharedLogCTime) {
|
|
|
- throw new IOException("It looks like the shared log is already " +
|
|
|
- "being upgraded but this NN has not been upgraded yet. You " +
|
|
|
- "should restart this NameNode with the '" +
|
|
|
- StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
|
|
|
- "this NN in sync with the other.");
|
|
|
+ if (startOpt == StartupOption.UPGRADE) {
|
|
|
+ long sharedLogCTime = editLog.getSharedLogCTime();
|
|
|
+ if (this.storage.getCTime() < sharedLogCTime) {
|
|
|
+ throw new IOException("It looks like the shared log is already " +
|
|
|
+ "being upgraded but this NN has not been upgraded yet. You " +
|
|
|
+ "should restart this NameNode with the '" +
|
|
|
+ StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
|
|
|
+ "this NN in sync with the other.");
|
|
|
+ }
|
|
|
}
|
|
|
editLog.recoverUnclosedStreams();
|
|
|
} else {
|
|
@@ -692,7 +775,13 @@ public class FSImage implements Closeable {
|
|
|
* Load the specified list of edit files into the image.
|
|
|
*/
|
|
|
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
|
|
- FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
|
|
|
+ FSNamesystem target) throws IOException {
|
|
|
+ return loadEdits(editStreams, target, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private long loadEdits(Iterable<EditLogInputStream> editStreams,
|
|
|
+ FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
|
|
|
+ throws IOException {
|
|
|
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
|
prog.beginPhase(Phase.LOADING_EDITS);
|
|
@@ -706,7 +795,7 @@ public class FSImage implements Closeable {
|
|
|
LOG.info("Reading " + editIn + " expecting start txid #" +
|
|
|
(lastAppliedTxId + 1));
|
|
|
try {
|
|
|
- loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
|
|
|
+ loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
|
|
|
} finally {
|
|
|
// Update lastAppliedTxId even in case of error, since some ops may
|
|
|
// have been successfully applied before the error.
|
|
@@ -720,7 +809,7 @@ public class FSImage implements Closeable {
|
|
|
} finally {
|
|
|
FSEditLog.closeAllStreams(editStreams);
|
|
|
// update the counts
|
|
|
- updateCountForQuota(target.dir.rootDir);
|
|
|
+ updateCountForQuota(target.dir.rootDir);
|
|
|
}
|
|
|
prog.endPhase(Phase.LOADING_EDITS);
|
|
|
return lastAppliedTxId - prevLastAppliedTxId;
|
|
@@ -798,9 +887,12 @@ public class FSImage implements Closeable {
|
|
|
*/
|
|
|
private void loadFSImage(File curFile, MD5Hash expectedMd5,
|
|
|
FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
|
|
|
+ // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
|
|
|
+ // information. Make sure the ID is properly set.
|
|
|
+ target.setBlockPoolId(this.getBlockPoolID());
|
|
|
+
|
|
|
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
|
|
|
loader.load(curFile);
|
|
|
- target.setBlockPoolId(this.getBlockPoolID());
|
|
|
|
|
|
// Check that the image digest we loaded matches up with what
|
|
|
// we expected
|
|
@@ -821,11 +913,11 @@ public class FSImage implements Closeable {
|
|
|
/**
|
|
|
* Save the contents of the FS image to the file.
|
|
|
*/
|
|
|
- void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
|
|
|
- throws IOException {
|
|
|
+ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
|
|
|
+ NameNodeFile dstType) throws IOException {
|
|
|
long txid = context.getTxId();
|
|
|
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
|
|
- File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
|
|
|
+ File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
|
|
|
|
|
|
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
|
|
|
FSImageCompression compression = FSImageCompression.createCompression(conf);
|
|
@@ -849,16 +941,19 @@ public class FSImage implements Closeable {
|
|
|
private class FSImageSaver implements Runnable {
|
|
|
private final SaveNamespaceContext context;
|
|
|
private StorageDirectory sd;
|
|
|
+ private final NameNodeFile nnf;
|
|
|
|
|
|
- public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
|
|
|
+ public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
|
|
|
+ NameNodeFile nnf) {
|
|
|
this.context = context;
|
|
|
this.sd = sd;
|
|
|
+ this.nnf = nnf;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
- saveFSImage(context, sd);
|
|
|
+ saveFSImage(context, sd, nnf);
|
|
|
} catch (SaveNamespaceCancelledException snce) {
|
|
|
LOG.info("Cancelled image saving for " + sd.getRoot() +
|
|
|
": " + snce.getMessage());
|
|
@@ -894,17 +989,18 @@ public class FSImage implements Closeable {
|
|
|
*/
|
|
|
public synchronized void saveNamespace(FSNamesystem source)
|
|
|
throws IOException {
|
|
|
- saveNamespace(source, null);
|
|
|
+ saveNamespace(source, NameNodeFile.IMAGE, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Save the contents of the FS image to a new image file in each of the
|
|
|
* current storage directories.
|
|
|
- * @param canceler
|
|
|
+ * @param canceler
|
|
|
*/
|
|
|
- public synchronized void saveNamespace(FSNamesystem source,
|
|
|
+ public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
|
|
|
Canceler canceler) throws IOException {
|
|
|
assert editLog != null : "editLog must be initialized";
|
|
|
+ LOG.info("Save namespace ...");
|
|
|
storage.attemptRestoreRemovedStorage();
|
|
|
|
|
|
boolean editLogWasOpen = editLog.isSegmentOpen();
|
|
@@ -914,7 +1010,7 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
long imageTxId = getLastAppliedOrWrittenTxId();
|
|
|
try {
|
|
|
- saveFSImageInAllDirs(source, imageTxId, canceler);
|
|
|
+ saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
|
|
|
storage.writeAll();
|
|
|
} finally {
|
|
|
if (editLogWasOpen) {
|
|
@@ -933,12 +1029,11 @@ public class FSImage implements Closeable {
|
|
|
*/
|
|
|
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
|
|
|
throws IOException {
|
|
|
- saveFSImageInAllDirs(source, txid, null);
|
|
|
+ saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
|
|
|
}
|
|
|
|
|
|
- protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
|
|
|
- Canceler canceler)
|
|
|
- throws IOException {
|
|
|
+ private synchronized void saveFSImageInAllDirs(FSNamesystem source,
|
|
|
+ NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
|
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
|
prog.beginPhase(Phase.SAVING_CHECKPOINT);
|
|
|
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
|
|
@@ -956,7 +1051,7 @@ public class FSImage implements Closeable {
|
|
|
for (Iterator<StorageDirectory> it
|
|
|
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
|
- FSImageSaver saver = new FSImageSaver(ctx, sd);
|
|
|
+ FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
|
|
|
Thread saveThread = new Thread(saver, saver.toString());
|
|
|
saveThreads.add(saveThread);
|
|
|
saveThread.start();
|
|
@@ -975,11 +1070,11 @@ public class FSImage implements Closeable {
|
|
|
assert false : "should have thrown above!";
|
|
|
}
|
|
|
|
|
|
- renameCheckpoint(txid);
|
|
|
+ renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
|
|
|
|
|
|
// Since we now have a new checkpoint, we can clean up some
|
|
|
// old edit logs and checkpoints.
|
|
|
- purgeOldStorage();
|
|
|
+ purgeOldStorage(nnf);
|
|
|
} finally {
|
|
|
// Notify any threads waiting on the checkpoint to be canceled
|
|
|
// that it is complete.
|
|
@@ -993,23 +1088,24 @@ public class FSImage implements Closeable {
|
|
|
* Purge any files in the storage directories that are no longer
|
|
|
* necessary.
|
|
|
*/
|
|
|
- public void purgeOldStorage() {
|
|
|
+ void purgeOldStorage(NameNodeFile nnf) {
|
|
|
try {
|
|
|
- archivalManager.purgeOldStorage();
|
|
|
+ archivalManager.purgeOldStorage(nnf);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Unable to purge old storage", e);
|
|
|
+ LOG.warn("Unable to purge old storage " + nnf.getName(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Renames new image
|
|
|
+ * Rename FSImage with the specific txid
|
|
|
*/
|
|
|
- private void renameCheckpoint(long txid) throws IOException {
|
|
|
+ private void renameCheckpoint(long txid, NameNodeFile fromNnf,
|
|
|
+ NameNodeFile toNnf, boolean renameMD5) throws IOException {
|
|
|
ArrayList<StorageDirectory> al = null;
|
|
|
|
|
|
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
|
|
|
try {
|
|
|
- renameCheckpointInDir(sd, txid);
|
|
|
+ renameImageFileInDir(sd, fromNnf, toNnf, txid, renameMD5);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.warn("Unable to rename checkpoint in " + sd, ioe);
|
|
|
if (al == null) {
|
|
@@ -1020,7 +1116,33 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
if(al != null) storage.reportErrorsOnDirectories(al);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Rename all the fsimage files with the specific NameNodeFile type. The
|
|
|
+ * associated checksum files will also be renamed.
|
|
|
+ */
|
|
|
+ void renameCheckpoint(NameNodeFile fromNnf, NameNodeFile toNnf)
|
|
|
+ throws IOException {
|
|
|
+ ArrayList<StorageDirectory> al = null;
|
|
|
+ FSImageTransactionalStorageInspector inspector =
|
|
|
+ new FSImageTransactionalStorageInspector(EnumSet.of(fromNnf));
|
|
|
+ storage.inspectStorageDirs(inspector);
|
|
|
+ for (FSImageFile image : inspector.getFoundImages()) {
|
|
|
+ try {
|
|
|
+ renameImageFileInDir(image.sd, fromNnf, toNnf, image.txId, true);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Unable to rename checkpoint in " + image.sd, ioe);
|
|
|
+ if (al == null) {
|
|
|
+ al = Lists.newArrayList();
|
|
|
+ }
|
|
|
+ al.add(image.sd);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(al != null) {
|
|
|
+ storage.reportErrorsOnDirectories(al);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Deletes the checkpoint file in every storage directory,
|
|
|
* since the checkpoint was cancelled.
|
|
@@ -1038,23 +1160,24 @@ public class FSImage implements Closeable {
|
|
|
storage.reportErrorsOnDirectories(al);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private void renameCheckpointInDir(StorageDirectory sd, long txid)
|
|
|
- throws IOException {
|
|
|
- File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
|
|
- File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
|
|
|
- // renameTo fails on Windows if the destination file
|
|
|
- // already exists.
|
|
|
+ private void renameImageFileInDir(StorageDirectory sd, NameNodeFile fromNnf,
|
|
|
+ NameNodeFile toNnf, long txid, boolean renameMD5) throws IOException {
|
|
|
+ final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
|
|
|
+ final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
|
|
|
+ // renameTo fails on Windows if the destination file already exists.
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("renaming " + ckpt.getAbsolutePath()
|
|
|
- + " to " + curFile.getAbsolutePath());
|
|
|
+ LOG.debug("renaming " + fromFile.getAbsolutePath()
|
|
|
+ + " to " + toFile.getAbsolutePath());
|
|
|
}
|
|
|
- if (!ckpt.renameTo(curFile)) {
|
|
|
- if (!curFile.delete() || !ckpt.renameTo(curFile)) {
|
|
|
- throw new IOException("renaming " + ckpt.getAbsolutePath() + " to " +
|
|
|
- curFile.getAbsolutePath() + " FAILED");
|
|
|
+ if (!fromFile.renameTo(toFile)) {
|
|
|
+ if (!toFile.delete() || !fromFile.renameTo(toFile)) {
|
|
|
+ throw new IOException("renaming " + fromFile.getAbsolutePath() + " to " +
|
|
|
+ toFile.getAbsolutePath() + " FAILED");
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
+ if (renameMD5) {
|
|
|
+ MD5FileUtils.renameMD5File(fromFile, toFile);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
CheckpointSignature rollEditLog() throws IOException {
|
|
@@ -1135,13 +1258,13 @@ public class FSImage implements Closeable {
|
|
|
* renames the image from fsimage_N.ckpt to fsimage_N and also
|
|
|
* saves the related .md5 file into place.
|
|
|
*/
|
|
|
- public synchronized void saveDigestAndRenameCheckpointImage(
|
|
|
+ public synchronized void saveDigestAndRenameCheckpointImage(NameNodeFile nnf,
|
|
|
long txid, MD5Hash digest) throws IOException {
|
|
|
// Write and rename MD5 file
|
|
|
List<StorageDirectory> badSds = Lists.newArrayList();
|
|
|
|
|
|
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
|
|
|
- File imageFile = NNStorage.getImageFile(sd, txid);
|
|
|
+ File imageFile = NNStorage.getImageFile(sd, nnf, txid);
|
|
|
try {
|
|
|
MD5FileUtils.saveMD5File(imageFile, digest);
|
|
|
} catch (IOException ioe) {
|
|
@@ -1153,7 +1276,7 @@ public class FSImage implements Closeable {
|
|
|
CheckpointFaultInjector.getInstance().afterMD5Rename();
|
|
|
|
|
|
// Rename image from tmp file
|
|
|
- renameCheckpoint(txid);
|
|
|
+ renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
|
|
|
// So long as this is the newest image available,
|
|
|
// advertise it as such to other checkpointers
|
|
|
// from now on
|