|
@@ -70,7 +70,6 @@ import com.google.common.collect.Lists;
|
|
|
public class FSImage implements Closeable {
|
|
|
protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
|
|
|
|
|
|
- protected FSNamesystem namesystem = null;
|
|
|
protected FSEditLog editLog = null;
|
|
|
private boolean isUpgradeFinalized = false;
|
|
|
|
|
@@ -82,38 +81,20 @@ public class FSImage implements Closeable {
|
|
|
*/
|
|
|
protected long lastAppliedTxId = 0;
|
|
|
|
|
|
- /**
|
|
|
- * URIs for importing an image from a checkpoint. In the default case,
|
|
|
- * URIs will represent directories.
|
|
|
- */
|
|
|
- private Collection<URI> checkpointDirs;
|
|
|
- private Collection<URI> checkpointEditsDirs;
|
|
|
-
|
|
|
final private Configuration conf;
|
|
|
|
|
|
private final NNStorageRetentionManager archivalManager;
|
|
|
|
|
|
- /**
|
|
|
- * Construct an FSImage.
|
|
|
- * @param conf Configuration
|
|
|
- * @see #FSImage(Configuration conf, FSNamesystem ns,
|
|
|
- * Collection imageDirs, Collection editsDirs)
|
|
|
- * @throws IOException if default directories are invalid.
|
|
|
- */
|
|
|
- public FSImage(Configuration conf) throws IOException {
|
|
|
- this(conf, (FSNamesystem)null);
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Construct an FSImage
|
|
|
* @param conf Configuration
|
|
|
- * @param ns The FSNamesystem using this image.
|
|
|
- * @see #FSImage(Configuration conf, FSNamesystem ns,
|
|
|
+ * @see #FSImage(Configuration conf,
|
|
|
* Collection imageDirs, Collection editsDirs)
|
|
|
* @throws IOException if default directories are invalid.
|
|
|
*/
|
|
|
- private FSImage(Configuration conf, FSNamesystem ns) throws IOException {
|
|
|
- this(conf, ns,
|
|
|
+ protected FSImage(Configuration conf) throws IOException {
|
|
|
+ this(conf,
|
|
|
FSNamesystem.getNamespaceDirs(conf),
|
|
|
FSNamesystem.getNamespaceEditsDirs(conf));
|
|
|
}
|
|
@@ -124,17 +105,14 @@ public class FSImage implements Closeable {
|
|
|
* Setup storage and initialize the edit log.
|
|
|
*
|
|
|
* @param conf Configuration
|
|
|
- * @param ns The FSNamesystem using this image.
|
|
|
* @param imageDirs Directories the image can be stored in.
|
|
|
* @param editsDirs Directories the editlog can be stored in.
|
|
|
* @throws IOException if directories are invalid.
|
|
|
*/
|
|
|
- protected FSImage(Configuration conf, FSNamesystem ns,
|
|
|
+ protected FSImage(Configuration conf,
|
|
|
Collection<URI> imageDirs, Collection<URI> editsDirs)
|
|
|
throws IOException {
|
|
|
this.conf = conf;
|
|
|
- setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
|
|
|
- FSImage.getCheckpointEditsDirs(conf, null));
|
|
|
|
|
|
storage = new NNStorage(conf, imageDirs, editsDirs);
|
|
|
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
|
|
@@ -143,31 +121,18 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
|
|
|
this.editLog = new FSEditLog(storage);
|
|
|
- setFSNamesystem(ns);
|
|
|
|
|
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
|
|
}
|
|
|
-
|
|
|
- protected FSNamesystem getFSNamesystem() {
|
|
|
- return namesystem;
|
|
|
- }
|
|
|
-
|
|
|
- void setFSNamesystem(FSNamesystem ns) {
|
|
|
- namesystem = ns;
|
|
|
- if (ns != null) {
|
|
|
- storage.setUpgradeManager(ns.upgradeManager);
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- void setCheckpointDirectories(Collection<URI> dirs,
|
|
|
- Collection<URI> editsDirs) {
|
|
|
- checkpointDirs = dirs;
|
|
|
- checkpointEditsDirs = editsDirs;
|
|
|
- }
|
|
|
-
|
|
|
- void format(String clusterId) throws IOException {
|
|
|
+ void format(FSNamesystem fsn, String clusterId) throws IOException {
|
|
|
+ long fileCount = fsn.getTotalFiles();
|
|
|
+ // Expect 1 file, which is the root inode
|
|
|
+ Preconditions.checkState(fileCount == 1,
|
|
|
+ "FSImage.format should be called with an uninitialized namesystem, has " +
|
|
|
+ fileCount + " files");
|
|
|
storage.format(clusterId);
|
|
|
- saveFSImageInAllDirs(0);
|
|
|
+ saveFSImageInAllDirs(fsn, 0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -179,7 +144,7 @@ public class FSImage implements Closeable {
|
|
|
* @throws IOException
|
|
|
* @return true if the image needs to be saved or false otherwise
|
|
|
*/
|
|
|
- boolean recoverTransitionRead(StartupOption startOpt)
|
|
|
+ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
|
|
|
throws IOException {
|
|
|
assert startOpt != StartupOption.FORMAT :
|
|
|
"NameNode formatting should be performed before reading the image";
|
|
@@ -187,21 +152,14 @@ public class FSImage implements Closeable {
|
|
|
Collection<URI> imageDirs = storage.getImageDirectories();
|
|
|
Collection<URI> editsDirs = storage.getEditsDirectories();
|
|
|
|
|
|
+
|
|
|
// none of the data dirs exist
|
|
|
if((imageDirs.size() == 0 || editsDirs.size() == 0)
|
|
|
&& startOpt != StartupOption.IMPORT)
|
|
|
throw new IOException(
|
|
|
"All specified directories are not accessible or do not exist.");
|
|
|
|
|
|
- if(startOpt == StartupOption.IMPORT
|
|
|
- && (checkpointDirs == null || checkpointDirs.isEmpty()))
|
|
|
- throw new IOException("Cannot import image from a checkpoint. "
|
|
|
- + "\"dfs.namenode.checkpoint.dir\" is not set." );
|
|
|
-
|
|
|
- if(startOpt == StartupOption.IMPORT
|
|
|
- && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
|
|
|
- throw new IOException("Cannot import image from a checkpoint. "
|
|
|
- + "\"dfs.namenode.checkpoint.dir\" is not set." );
|
|
|
+ storage.setUpgradeManager(target.upgradeManager);
|
|
|
|
|
|
// 1. For each data directory calculate its state and
|
|
|
// check whether all is consistent before transitioning.
|
|
@@ -261,10 +219,10 @@ public class FSImage implements Closeable {
|
|
|
// 3. Do transitions
|
|
|
switch(startOpt) {
|
|
|
case UPGRADE:
|
|
|
- doUpgrade();
|
|
|
+ doUpgrade(target);
|
|
|
return false; // upgrade saved image already
|
|
|
case IMPORT:
|
|
|
- doImportCheckpoint();
|
|
|
+ doImportCheckpoint(target);
|
|
|
return false; // import checkpoint saved image already
|
|
|
case ROLLBACK:
|
|
|
doRollback();
|
|
@@ -273,7 +231,7 @@ public class FSImage implements Closeable {
|
|
|
// just load the image
|
|
|
}
|
|
|
|
|
|
- return loadFSImage();
|
|
|
+ return loadFSImage(target);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -324,11 +282,11 @@ public class FSImage implements Closeable {
|
|
|
return isFormatted;
|
|
|
}
|
|
|
|
|
|
- private void doUpgrade() throws IOException {
|
|
|
+ private void doUpgrade(FSNamesystem target) throws IOException {
|
|
|
if(storage.getDistributedUpgradeState()) {
|
|
|
// only distributed upgrade need to continue
|
|
|
// don't do version upgrade
|
|
|
- this.loadFSImage();
|
|
|
+ this.loadFSImage(target);
|
|
|
storage.initializeDistributedUpgrade();
|
|
|
return;
|
|
|
}
|
|
@@ -343,7 +301,7 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
|
|
|
// load the latest image
|
|
|
- this.loadFSImage();
|
|
|
+ this.loadFSImage(target);
|
|
|
|
|
|
// Do upgrade for each directory
|
|
|
long oldCTime = storage.getCTime();
|
|
@@ -385,7 +343,7 @@ public class FSImage implements Closeable {
|
|
|
storage.reportErrorsOnDirectories(errorSDs);
|
|
|
errorSDs.clear();
|
|
|
|
|
|
- saveFSImageInAllDirs(editLog.getLastWrittenTxId());
|
|
|
+ saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
|
|
|
|
|
|
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
@@ -422,7 +380,7 @@ public class FSImage implements Closeable {
|
|
|
// a previous fs states in at least one of the storage directories.
|
|
|
// Directories that don't have previous state do not rollback
|
|
|
boolean canRollback = false;
|
|
|
- FSImage prevState = new FSImage(conf, getFSNamesystem());
|
|
|
+ FSImage prevState = new FSImage(conf);
|
|
|
prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
|
|
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
@@ -504,19 +462,32 @@ public class FSImage implements Closeable {
|
|
|
|
|
|
/**
|
|
|
* Load image from a checkpoint directory and save it into the current one.
|
|
|
+ * @param target the NameSystem to import into
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void doImportCheckpoint() throws IOException {
|
|
|
- FSNamesystem fsNamesys = getFSNamesystem();
|
|
|
- FSImage ckptImage = new FSImage(conf, fsNamesys,
|
|
|
+ void doImportCheckpoint(FSNamesystem target) throws IOException {
|
|
|
+ Collection<URI> checkpointDirs =
|
|
|
+ FSImage.getCheckpointDirs(conf, null);
|
|
|
+ Collection<URI> checkpointEditsDirs =
|
|
|
+ FSImage.getCheckpointEditsDirs(conf, null);
|
|
|
+
|
|
|
+ if (checkpointDirs == null || checkpointDirs.isEmpty()) {
|
|
|
+ throw new IOException("Cannot import image from a checkpoint. "
|
|
|
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
|
|
|
+ }
|
|
|
+
|
|
|
+ if (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()) {
|
|
|
+ throw new IOException("Cannot import image from a checkpoint. "
|
|
|
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
|
|
|
+ }
|
|
|
+
|
|
|
+ FSImage realImage = target.getFSImage();
|
|
|
+ FSImage ckptImage = new FSImage(conf,
|
|
|
checkpointDirs, checkpointEditsDirs);
|
|
|
- // replace real image with the checkpoint image
|
|
|
- FSImage realImage = fsNamesys.getFSImage();
|
|
|
- assert realImage == this;
|
|
|
- fsNamesys.dir.fsImage = ckptImage;
|
|
|
+ target.dir.fsImage = ckptImage;
|
|
|
// load from the checkpoint dirs
|
|
|
try {
|
|
|
- ckptImage.recoverTransitionRead(StartupOption.REGULAR);
|
|
|
+ ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
|
|
|
} finally {
|
|
|
ckptImage.close();
|
|
|
}
|
|
@@ -524,10 +495,11 @@ public class FSImage implements Closeable {
|
|
|
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
|
|
|
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
|
|
|
|
|
- fsNamesys.dir.fsImage = realImage;
|
|
|
+ target.dir.fsImage = realImage;
|
|
|
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
|
|
+
|
|
|
// and save it but keep the same checkpointTime
|
|
|
- saveNamespace();
|
|
|
+ saveNamespace(target);
|
|
|
getStorage().writeAll();
|
|
|
}
|
|
|
|
|
@@ -558,11 +530,11 @@ public class FSImage implements Closeable {
|
|
|
* Toss the current image and namesystem, reloading from the specified
|
|
|
* file.
|
|
|
*/
|
|
|
- void reloadFromImageFile(File file) throws IOException {
|
|
|
- namesystem.dir.reset();
|
|
|
+ void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
|
|
|
+ target.dir.reset();
|
|
|
|
|
|
LOG.debug("Reloading namespace from " + file);
|
|
|
- loadFSImage(file);
|
|
|
+ loadFSImage(file, target);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -580,7 +552,7 @@ public class FSImage implements Closeable {
|
|
|
* @return whether the image should be saved
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- boolean loadFSImage() throws IOException {
|
|
|
+ boolean loadFSImage(FSNamesystem target) throws IOException {
|
|
|
FSImageStorageInspector inspector = storage.readAndInspectDirs();
|
|
|
|
|
|
isUpgradeFinalized = inspector.isUpgradeFinalized();
|
|
@@ -615,7 +587,7 @@ public class FSImage implements Closeable {
|
|
|
getLayoutVersion())) {
|
|
|
// For txid-based layout, we should have a .md5 file
|
|
|
// next to the image file
|
|
|
- loadFSImage(imageFile.getFile());
|
|
|
+ loadFSImage(imageFile.getFile(), target);
|
|
|
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
|
|
|
getLayoutVersion())) {
|
|
|
// In 0.22, we have the checksum stored in the VERSION file.
|
|
@@ -627,17 +599,17 @@ public class FSImage implements Closeable {
|
|
|
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
|
|
|
" not set for storage directory " + sdForProperties.getRoot());
|
|
|
}
|
|
|
- loadFSImage(imageFile.getFile(), new MD5Hash(md5));
|
|
|
+ loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
|
|
|
} else {
|
|
|
// We don't have any record of the md5sum
|
|
|
- loadFSImage(imageFile.getFile(), null);
|
|
|
+ loadFSImage(imageFile.getFile(), null, target);
|
|
|
}
|
|
|
} catch (IOException ioe) {
|
|
|
FSEditLog.closeAllStreams(editStreams);
|
|
|
throw new IOException("Failed to load image from " + imageFile, ioe);
|
|
|
}
|
|
|
|
|
|
- long numLoaded = loadEdits(editStreams);
|
|
|
+ long numLoaded = loadEdits(editStreams, target);
|
|
|
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
|
|
numLoaded);
|
|
|
|
|
@@ -671,14 +643,15 @@ public class FSImage implements Closeable {
|
|
|
* Load the specified list of edit files into the image.
|
|
|
* @return the number of transactions loaded
|
|
|
*/
|
|
|
- protected long loadEdits(Iterable<EditLogInputStream> editStreams) throws IOException {
|
|
|
+ protected long loadEdits(Iterable<EditLogInputStream> editStreams,
|
|
|
+ FSNamesystem target) throws IOException {
|
|
|
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
|
|
|
|
|
long startingTxId = getLastAppliedTxId() + 1;
|
|
|
int numLoaded = 0;
|
|
|
|
|
|
try {
|
|
|
- FSEditLogLoader loader = new FSEditLogLoader(namesystem);
|
|
|
+ FSEditLogLoader loader = new FSEditLogLoader(target);
|
|
|
|
|
|
// Load latest edits
|
|
|
for (EditLogInputStream editIn : editStreams) {
|
|
@@ -693,7 +666,7 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
|
|
|
// update the counts
|
|
|
- getFSNamesystem().dir.updateCountForINodeWithQuota();
|
|
|
+ target.dir.updateCountForINodeWithQuota();
|
|
|
return numLoaded;
|
|
|
}
|
|
|
|
|
@@ -702,13 +675,14 @@ public class FSImage implements Closeable {
|
|
|
* Load the image namespace from the given image file, verifying
|
|
|
* it against the MD5 sum stored in its associated .md5 file.
|
|
|
*/
|
|
|
- private void loadFSImage(File imageFile) throws IOException {
|
|
|
+ private void loadFSImage(File imageFile, FSNamesystem target)
|
|
|
+ throws IOException {
|
|
|
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
|
|
|
if (expectedMD5 == null) {
|
|
|
throw new IOException("No MD5 file found corresponding to image file "
|
|
|
+ imageFile);
|
|
|
}
|
|
|
- loadFSImage(imageFile, expectedMD5);
|
|
|
+ loadFSImage(imageFile, expectedMD5, target);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -716,11 +690,12 @@ public class FSImage implements Closeable {
|
|
|
* filenames and blocks. Return whether we should
|
|
|
* "re-save" and consolidate the edit-logs
|
|
|
*/
|
|
|
- private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
|
|
|
+ private void loadFSImage(File curFile, MD5Hash expectedMd5,
|
|
|
+ FSNamesystem target) throws IOException {
|
|
|
FSImageFormat.Loader loader = new FSImageFormat.Loader(
|
|
|
- conf, getFSNamesystem());
|
|
|
+ conf, target);
|
|
|
loader.load(curFile);
|
|
|
- namesystem.setBlockPoolId(this.getBlockPoolID());
|
|
|
+ target.setBlockPoolId(this.getBlockPoolID());
|
|
|
|
|
|
// Check that the image digest we loaded matches up with what
|
|
|
// we expected
|
|
@@ -741,13 +716,14 @@ public class FSImage implements Closeable {
|
|
|
/**
|
|
|
* Save the contents of the FS image to the file.
|
|
|
*/
|
|
|
- void saveFSImage(StorageDirectory sd, long txid) throws IOException {
|
|
|
+ void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
|
|
|
+ throws IOException {
|
|
|
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
|
|
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
|
|
|
|
|
|
FSImageFormat.Saver saver = new FSImageFormat.Saver();
|
|
|
FSImageCompression compression = FSImageCompression.createCompression(conf);
|
|
|
- saver.save(newFile, txid, getFSNamesystem(), compression);
|
|
|
+ saver.save(newFile, txid, source, compression);
|
|
|
|
|
|
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
|
|
|
storage.setMostRecentCheckpointTxId(txid);
|
|
@@ -768,8 +744,11 @@ public class FSImage implements Closeable {
|
|
|
private StorageDirectory sd;
|
|
|
private List<StorageDirectory> errorSDs;
|
|
|
private final long txid;
|
|
|
+ private final FSNamesystem source;
|
|
|
|
|
|
- FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs, long txid) {
|
|
|
+ FSImageSaver(FSNamesystem source, StorageDirectory sd,
|
|
|
+ List<StorageDirectory> errorSDs, long txid) {
|
|
|
+ this.source = source;
|
|
|
this.sd = sd;
|
|
|
this.errorSDs = errorSDs;
|
|
|
this.txid = txid;
|
|
@@ -777,7 +756,7 @@ public class FSImage implements Closeable {
|
|
|
|
|
|
public void run() {
|
|
|
try {
|
|
|
- saveFSImage(sd, txid);
|
|
|
+ saveFSImage(source, sd, txid);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("Unable to save image for " + sd.getRoot(), t);
|
|
|
errorSDs.add(sd);
|
|
@@ -806,7 +785,7 @@ public class FSImage implements Closeable {
|
|
|
* Save the contents of the FS image to a new image file in each of the
|
|
|
* current storage directories.
|
|
|
*/
|
|
|
- void saveNamespace() throws IOException {
|
|
|
+ void saveNamespace(FSNamesystem source) throws IOException {
|
|
|
assert editLog != null : "editLog must be initialized";
|
|
|
storage.attemptRestoreRemovedStorage();
|
|
|
|
|
@@ -817,7 +796,7 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
long imageTxId = editLog.getLastWrittenTxId();
|
|
|
try {
|
|
|
- saveFSImageInAllDirs(imageTxId);
|
|
|
+ saveFSImageInAllDirs(source, imageTxId);
|
|
|
storage.writeAll();
|
|
|
} finally {
|
|
|
if (editLogWasOpen) {
|
|
@@ -829,7 +808,8 @@ public class FSImage implements Closeable {
|
|
|
|
|
|
}
|
|
|
|
|
|
- protected void saveFSImageInAllDirs(long txid) throws IOException {
|
|
|
+ protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
|
|
|
+ throws IOException {
|
|
|
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
|
|
|
throw new IOException("No image directories available!");
|
|
|
}
|
|
@@ -842,7 +822,7 @@ public class FSImage implements Closeable {
|
|
|
for (Iterator<StorageDirectory> it
|
|
|
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
|
|
StorageDirectory sd = it.next();
|
|
|
- FSImageSaver saver = new FSImageSaver(sd, errorSDs, txid);
|
|
|
+ FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
|
|
|
Thread saveThread = new Thread(saver, saver.toString());
|
|
|
saveThreads.add(saveThread);
|
|
|
saveThread.start();
|