|
@@ -29,13 +29,14 @@ import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.RandomAccessFile;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
-import java.util.AbstractList;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Date;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Properties;
|
|
|
import java.util.Random;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.HashMap;
|
|
|
import java.lang.Math;
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
|
|
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
|
|
|
/**
|
|
|
* FSImage handles checkpointing and logging of the namespace edits.
|
|
@@ -83,6 +85,29 @@ public class FSImage extends Storage {
|
|
|
String getName() {return fileName;}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Implementation of StorageDirType specific to namenode storage
|
|
|
+ * A Storage directory could be of type IMAGE which stores only fsimage,
|
|
|
+ * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
|
|
|
+ * stores both fsimage and edits.
|
|
|
+ */
|
|
|
+ static enum NameNodeDirType implements StorageDirType {
|
|
|
+ UNDEFINED,
|
|
|
+ IMAGE,
|
|
|
+ EDITS,
|
|
|
+ IMAGE_AND_EDITS;
|
|
|
+
|
|
|
+ public StorageDirType getStorageDirType() {
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isOfType(StorageDirType type) {
|
|
|
+ if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
|
|
|
+ return true;
|
|
|
+ return this == type;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected long checkpointTime = -1L;
|
|
|
private FSEditLog editLog = null;
|
|
|
private boolean isUpgradeFinalized = false;
|
|
@@ -90,6 +115,7 @@ public class FSImage extends Storage {
|
|
|
* Directories for importing an image from a checkpoint.
|
|
|
*/
|
|
|
private Collection<File> checkpointDirs;
|
|
|
+ private Collection<File> checkpointEditsDirs;
|
|
|
|
|
|
/**
|
|
|
* Can fs-image be rolled?
|
|
@@ -111,9 +137,10 @@ public class FSImage extends Storage {
|
|
|
|
|
|
/**
|
|
|
*/
|
|
|
- FSImage(Collection<File> fsDirs) throws IOException {
|
|
|
+ FSImage(Collection<File> fsDirs, Collection<File> fsEditsDirs)
|
|
|
+ throws IOException {
|
|
|
this();
|
|
|
- setStorageDirectories(fsDirs);
|
|
|
+ setStorageDirectories(fsDirs, fsEditsDirs);
|
|
|
}
|
|
|
|
|
|
public FSImage(StorageInfo storageInfo) {
|
|
@@ -126,57 +153,77 @@ public class FSImage extends Storage {
|
|
|
public FSImage(File imageDir) throws IOException {
|
|
|
this();
|
|
|
ArrayList<File> dirs = new ArrayList<File>(1);
|
|
|
+ ArrayList<File> editsDirs = new ArrayList<File>(1);
|
|
|
dirs.add(imageDir);
|
|
|
- setStorageDirectories(dirs);
|
|
|
+ editsDirs.add(imageDir);
|
|
|
+ setStorageDirectories(dirs, editsDirs);
|
|
|
}
|
|
|
|
|
|
- void setStorageDirectories(Collection<File> fsDirs) throws IOException {
|
|
|
- this.storageDirs = new ArrayList<StorageDirectory>(fsDirs.size());
|
|
|
- for(Iterator<File> it = fsDirs.iterator(); it.hasNext();)
|
|
|
- this.addStorageDir(new StorageDirectory(it.next()));
|
|
|
+ void setStorageDirectories(Collection<File> fsNameDirs,
|
|
|
+ Collection<File> fsEditsDirs
|
|
|
+ ) throws IOException {
|
|
|
+ this.storageDirs = new ArrayList<StorageDirectory>();
|
|
|
+ // Add all name dirs with appropriate NameNodeDirType
|
|
|
+ for (File dirName : fsNameDirs) {
|
|
|
+ boolean isAlsoEdits = false;
|
|
|
+ for (File editsDirName : fsEditsDirs) {
|
|
|
+ if (editsDirName.compareTo(dirName) == 0) {
|
|
|
+ isAlsoEdits = true;
|
|
|
+ fsEditsDirs.remove(editsDirName);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ NameNodeDirType dirType = (isAlsoEdits) ?
|
|
|
+ NameNodeDirType.IMAGE_AND_EDITS :
|
|
|
+ NameNodeDirType.IMAGE;
|
|
|
+ this.addStorageDir(new StorageDirectory(dirName, dirType));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add edits dirs if they are different from name dirs
|
|
|
+ for (File dirName : fsEditsDirs) {
|
|
|
+ this.addStorageDir(new StorageDirectory(dirName,
|
|
|
+ NameNodeDirType.EDITS));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- void setCheckpointDirectories(Collection<File> dirs) {
|
|
|
+ void setCheckpointDirectories(Collection<File> dirs,
|
|
|
+ Collection<File> editsDirs) {
|
|
|
checkpointDirs = dirs;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- */
|
|
|
- File getImageFile(int imageDirIdx, NameNodeFile type) {
|
|
|
- return getImageFile(getStorageDir(imageDirIdx), type);
|
|
|
+ checkpointEditsDirs = editsDirs;
|
|
|
}
|
|
|
|
|
|
static File getImageFile(StorageDirectory sd, NameNodeFile type) {
|
|
|
return new File(sd.getCurrentDir(), type.getName());
|
|
|
}
|
|
|
|
|
|
- File getEditFile(int idx) {
|
|
|
- return getImageFile(idx, NameNodeFile.EDITS);
|
|
|
+ File getEditFile(StorageDirectory sd) {
|
|
|
+ return getImageFile(sd, NameNodeFile.EDITS);
|
|
|
}
|
|
|
|
|
|
- File getEditNewFile(int idx) {
|
|
|
- return getImageFile(idx, NameNodeFile.EDITS_NEW);
|
|
|
+ File getEditNewFile(StorageDirectory sd) {
|
|
|
+ return getImageFile(sd, NameNodeFile.EDITS_NEW);
|
|
|
}
|
|
|
|
|
|
- File[] getFileNames(NameNodeFile type) {
|
|
|
- File[] list = new File[getNumStorageDirs()];
|
|
|
- int i=0;
|
|
|
- for(StorageDirectory sd : storageDirs) {
|
|
|
- list[i++] = getImageFile(sd, type);
|
|
|
+ File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
|
|
|
+ ArrayList<File> list = new ArrayList<File>();
|
|
|
+ Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
|
|
|
+ dirIterator(dirType);
|
|
|
+ for ( ;it.hasNext(); ) {
|
|
|
+ list.add(getImageFile(it.next(), type));
|
|
|
}
|
|
|
- return list;
|
|
|
+ return list.toArray(new File[list.size()]);
|
|
|
}
|
|
|
|
|
|
File[] getImageFiles() {
|
|
|
- return getFileNames(NameNodeFile.IMAGE);
|
|
|
+ return getFileNames(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
|
|
|
}
|
|
|
|
|
|
File[] getEditsFiles() {
|
|
|
- return getFileNames(NameNodeFile.EDITS);
|
|
|
+ return getFileNames(NameNodeFile.EDITS, NameNodeDirType.EDITS);
|
|
|
}
|
|
|
|
|
|
File[] getTimeFiles() {
|
|
|
- return getFileNames(NameNodeFile.TIME);
|
|
|
+ return getFileNames(NameNodeFile.TIME, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -191,25 +238,36 @@ public class FSImage extends Storage {
|
|
|
* @return true if the image needs to be saved or false otherwise
|
|
|
*/
|
|
|
boolean recoverTransitionRead(Collection<File> dataDirs,
|
|
|
- StartupOption startOpt
|
|
|
- ) throws IOException {
|
|
|
+ Collection<File> editsDirs,
|
|
|
+ StartupOption startOpt
|
|
|
+ ) throws IOException {
|
|
|
assert startOpt != StartupOption.FORMAT :
|
|
|
"NameNode formatting should be performed before reading the image";
|
|
|
-
|
|
|
+
|
|
|
+ // none of the data dirs exist
|
|
|
+ if (dataDirs.size() == 0 || editsDirs.size() == 0)
|
|
|
+ throw new IOException(
|
|
|
+ "All specified directories are not accessible or do not exist.");
|
|
|
+
|
|
|
if(startOpt == StartupOption.IMPORT
|
|
|
&& (checkpointDirs == null || checkpointDirs.isEmpty()))
|
|
|
throw new IOException("Cannot import image from a checkpoint. "
|
|
|
+ "\"fs.checkpoint.dir\" is not set." );
|
|
|
|
|
|
+ if(startOpt == StartupOption.IMPORT
|
|
|
+ && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
|
|
|
+ throw new IOException("Cannot import image from a checkpoint. "
|
|
|
+ + "\"fs.checkpoint.edits.dir\" is not set." );
|
|
|
+
|
|
|
+ setStorageDirectories(dataDirs, editsDirs);
|
|
|
// 1. For each data directory calculate its state and
|
|
|
// check whether all is consistent before transitioning.
|
|
|
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
|
|
|
- AbstractList<StorageState> dataDirStates =
|
|
|
- new ArrayList<StorageState>(dataDirs.size());
|
|
|
+ Map<StorageDirectory, StorageState> dataDirStates =
|
|
|
+ new HashMap<StorageDirectory, StorageState>();
|
|
|
boolean isFormatted = false;
|
|
|
- for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
|
|
|
- File dataDir = it.next();
|
|
|
- StorageDirectory sd = new StorageDirectory(dataDir);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
StorageState curState;
|
|
|
try {
|
|
|
curState = sd.analyzeStorage(startOpt);
|
|
@@ -217,7 +275,7 @@ public class FSImage extends Storage {
|
|
|
switch(curState) {
|
|
|
case NON_EXISTENT:
|
|
|
// name-node fails if any of the configured storage dirs are missing
|
|
|
- throw new InconsistentFSStateException(sd.root,
|
|
|
+ throw new InconsistentFSStateException(sd.getRoot(),
|
|
|
"storage directory does not exist or is not accessible.");
|
|
|
case NOT_FORMATTED:
|
|
|
break;
|
|
@@ -234,19 +292,14 @@ public class FSImage extends Storage {
|
|
|
if (startOpt == StartupOption.IMPORT && isFormatted)
|
|
|
// import of a checkpoint is allowed only into empty image directories
|
|
|
throw new IOException("Cannot import image from a checkpoint. "
|
|
|
- + " NameNode already contains an image in " + sd.root);
|
|
|
+ + " NameNode already contains an image in " + sd.getRoot());
|
|
|
} catch (IOException ioe) {
|
|
|
sd.unlock();
|
|
|
throw ioe;
|
|
|
}
|
|
|
- // add to the storage list
|
|
|
- addStorageDir(sd);
|
|
|
- dataDirStates.add(curState);
|
|
|
+ dataDirStates.put(sd,curState);
|
|
|
}
|
|
|
-
|
|
|
- if (dataDirs.size() == 0) // none of the data dirs exist
|
|
|
- throw new IOException(
|
|
|
- "All specified directories are not accessible or do not exist.");
|
|
|
+
|
|
|
if (!isFormatted && startOpt != StartupOption.ROLLBACK
|
|
|
&& startOpt != StartupOption.IMPORT)
|
|
|
throw new IOException("NameNode is not formatted.");
|
|
@@ -265,14 +318,15 @@ public class FSImage extends Storage {
|
|
|
|
|
|
// 2. Format unformatted dirs.
|
|
|
this.checkpointTime = 0L;
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
- StorageState curState = dataDirStates.get(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ StorageState curState = dataDirStates.get(sd);
|
|
|
switch(curState) {
|
|
|
case NON_EXISTENT:
|
|
|
assert false : StorageState.NON_EXISTENT + " state cannot be here";
|
|
|
case NOT_FORMATTED:
|
|
|
- LOG.info("Storage directory " + sd.root + " is not formatted.");
|
|
|
+ LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
|
|
|
LOG.info("Formatting ...");
|
|
|
sd.clearDirectory(); // create empty currrent dir
|
|
|
break;
|
|
@@ -308,10 +362,11 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
// Upgrade is allowed only if there are
|
|
|
// no previous fs states in any of the directories
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
if (sd.getPreviousDir().exists())
|
|
|
- throw new InconsistentFSStateException(sd.root,
|
|
|
+ throw new InconsistentFSStateException(sd.getRoot(),
|
|
|
"previous fs state should not exist during upgrade. "
|
|
|
+ "Finalize or rollback first.");
|
|
|
}
|
|
@@ -325,9 +380,10 @@ public class FSImage extends Storage {
|
|
|
int oldLV = this.getLayoutVersion();
|
|
|
this.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
this.checkpointTime = FSNamesystem.now();
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
- LOG.info("Upgrading image directory " + sd.root
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ LOG.info("Upgrading image directory " + sd.getRoot()
|
|
|
+ ".\n old LV = " + oldLV
|
|
|
+ "; old CTime = " + oldCTime
|
|
|
+ ".\n new LV = " + this.getLayoutVersion()
|
|
@@ -350,7 +406,7 @@ public class FSImage extends Storage {
|
|
|
// rename tmp to previous
|
|
|
rename(tmpDir, prevDir);
|
|
|
isUpgradeFinalized = false;
|
|
|
- LOG.info("Upgrade of " + sd.root + " is complete.");
|
|
|
+ LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
|
|
|
}
|
|
|
initializeDistributedUpgrade();
|
|
|
editLog.open();
|
|
@@ -363,16 +419,17 @@ public class FSImage extends Storage {
|
|
|
boolean canRollback = false;
|
|
|
FSImage prevState = new FSImage();
|
|
|
prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
File prevDir = sd.getPreviousDir();
|
|
|
if (!prevDir.exists()) { // use current directory then
|
|
|
- LOG.info("Storage directory " + sd.root
|
|
|
+ LOG.info("Storage directory " + sd.getRoot()
|
|
|
+ " does not contain previous fs state.");
|
|
|
sd.read(); // read and verify consistency with other directories
|
|
|
continue;
|
|
|
}
|
|
|
- StorageDirectory sdPrev = prevState.new StorageDirectory(sd.root);
|
|
|
+ StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
|
|
|
sdPrev.read(sdPrev.getPreviousVersionFile()); // read and verify consistency of the prev dir
|
|
|
canRollback = true;
|
|
|
}
|
|
@@ -382,13 +439,14 @@ public class FSImage extends Storage {
|
|
|
|
|
|
// Now that we know all directories are going to be consistent
|
|
|
// Do rollback for each directory containing previous state
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
File prevDir = sd.getPreviousDir();
|
|
|
if (!prevDir.exists())
|
|
|
continue;
|
|
|
|
|
|
- LOG.info("Rolling back storage directory " + sd.root
|
|
|
+ LOG.info("Rolling back storage directory " + sd.getRoot()
|
|
|
+ ".\n new LV = " + prevState.getLayoutVersion()
|
|
|
+ "; new CTime = " + prevState.getCTime());
|
|
|
File tmpDir = sd.getRemovedTmp();
|
|
@@ -402,7 +460,7 @@ public class FSImage extends Storage {
|
|
|
|
|
|
// delete tmp dir
|
|
|
deleteDir(tmpDir);
|
|
|
- LOG.info("Rollback of " + sd.root + " is complete.");
|
|
|
+ LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
|
|
|
}
|
|
|
isUpgradeFinalized = true;
|
|
|
// check whether name-node can start in regular mode
|
|
@@ -413,11 +471,11 @@ public class FSImage extends Storage {
|
|
|
File prevDir = sd.getPreviousDir();
|
|
|
if (!prevDir.exists()) { // already discarded
|
|
|
LOG.info("Directory " + prevDir + " does not exist.");
|
|
|
- LOG.info("Finalize upgrade for " + sd.root + " is not required.");
|
|
|
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
|
|
|
return;
|
|
|
}
|
|
|
LOG.info("Finalizing upgrade for storage directory "
|
|
|
- + sd.root + "."
|
|
|
+ + sd.getRoot() + "."
|
|
|
+ (getLayoutVersion()==0 ? "" :
|
|
|
"\n cur LV = " + this.getLayoutVersion()
|
|
|
+ "; cur CTime = " + this.getCTime()));
|
|
@@ -427,7 +485,7 @@ public class FSImage extends Storage {
|
|
|
rename(prevDir, tmpDir);
|
|
|
deleteDir(tmpDir);
|
|
|
isUpgradeFinalized = true;
|
|
|
- LOG.info("Finalize upgrade for " + sd.root + " is complete.");
|
|
|
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -443,7 +501,8 @@ public class FSImage extends Storage {
|
|
|
fsNamesys.dir.fsImage = ckptImage;
|
|
|
// load from the checkpoint dirs
|
|
|
try {
|
|
|
- ckptImage.recoverTransitionRead(checkpointDirs, StartupOption.REGULAR);
|
|
|
+ ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
|
|
|
+ StartupOption.REGULAR);
|
|
|
} finally {
|
|
|
ckptImage.close();
|
|
|
}
|
|
@@ -455,8 +514,10 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
|
|
|
void finalizeUpgrade() throws IOException {
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++)
|
|
|
- doFinalize(getStorageDir(idx));
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ doFinalize(it.next());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
boolean isUpgradeFinalized() {
|
|
@@ -469,7 +530,7 @@ public class FSImage extends Storage {
|
|
|
super.getFields(props, sd);
|
|
|
if (layoutVersion == 0)
|
|
|
throw new IOException("NameNode directory "
|
|
|
- + sd.root + " is not formatted.");
|
|
|
+ + sd.getRoot() + " is not formatted.");
|
|
|
String sDUS, sDUV;
|
|
|
sDUS = props.getProperty("distributedUpgradeState");
|
|
|
sDUV = props.getProperty("distributedUpgradeVersion");
|
|
@@ -539,30 +600,38 @@ public class FSImage extends Storage {
|
|
|
/**
|
|
|
* Record new checkpoint time in order to
|
|
|
* distinguish healthy directories from the removed ones.
|
|
|
- *
|
|
|
- * @return -1 if successful, or the index of the failed storage directory.
|
|
|
+ * If there is an error writing new checkpoint time, the corresponding
|
|
|
+ * storage directory is removed from the list.
|
|
|
*/
|
|
|
- int incrementCheckpointTime() {
|
|
|
+ void incrementCheckpointTime() {
|
|
|
this.checkpointTime++;
|
|
|
- // Write new checkpoint time.
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
+
|
|
|
+ // Write new checkpoint time in all storage directories
|
|
|
+ for(Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
try {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
writeCheckpointTime(sd);
|
|
|
- } catch(IOException e) {
|
|
|
- return idx;
|
|
|
+ } catch(IOException e) {
|
|
|
+ // Close any edits stream associated with this dir and remove directory
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
|
|
|
+ editLog.processIOError(sd);
|
|
|
+ it.remove();
|
|
|
}
|
|
|
}
|
|
|
- return -1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * If there is an IO Error on any log operations, remove that
|
|
|
- * directory from the list of directories.
|
|
|
+ * Remove storage directory given directory
|
|
|
*/
|
|
|
- void processIOError(int index) {
|
|
|
- assert(index >= 0 && index < getNumStorageDirs());
|
|
|
- storageDirs.remove(index);
|
|
|
+
|
|
|
+ void processIOError(File dirName) {
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ if (sd.getRoot().getPath().equals(dirName.getParent()))
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public FSEditLog getEditLog() {
|
|
@@ -570,10 +639,10 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
|
|
|
public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
|
|
|
- File oldImageDir = new File(sd.root, "image");
|
|
|
+ File oldImageDir = new File(sd.getRoot(), "image");
|
|
|
if (!oldImageDir.exists()) {
|
|
|
if(sd.getVersionFile().exists())
|
|
|
- throw new InconsistentFSStateException(sd.root,
|
|
|
+ throw new InconsistentFSStateException(sd.getRoot(),
|
|
|
oldImageDir + " does not exist.");
|
|
|
return false;
|
|
|
}
|
|
@@ -594,15 +663,19 @@ public class FSImage extends Storage {
|
|
|
//
|
|
|
// Atomic move sequence, to recover from interrupted checkpoint
|
|
|
//
|
|
|
- void recoverInterruptedCheckpoint(StorageDirectory sd) throws IOException {
|
|
|
- File curFile = getImageFile(sd, NameNodeFile.IMAGE);
|
|
|
- File ckptFile = getImageFile(sd, NameNodeFile.IMAGE_NEW);
|
|
|
+ boolean recoverInterruptedCheckpoint(StorageDirectory nameSD,
|
|
|
+ StorageDirectory editsSD)
|
|
|
+ throws IOException {
|
|
|
+ boolean needToSave = false;
|
|
|
+ File curFile = getImageFile(nameSD, NameNodeFile.IMAGE);
|
|
|
+ File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW);
|
|
|
|
|
|
//
|
|
|
// If we were in the midst of a checkpoint
|
|
|
//
|
|
|
if (ckptFile.exists()) {
|
|
|
- if (getImageFile(sd, NameNodeFile.EDITS_NEW).exists()) {
|
|
|
+ needToSave = true;
|
|
|
+ if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
|
|
|
//
|
|
|
// checkpointing migth have uploaded a new
|
|
|
// merged image, but we discard it here because we are
|
|
@@ -622,7 +695,8 @@ public class FSImage extends Storage {
|
|
|
// if the destination file already exists.
|
|
|
//
|
|
|
if (!ckptFile.renameTo(curFile)) {
|
|
|
- curFile.delete();
|
|
|
+ if (!curFile.delete())
|
|
|
+ LOG.warn("Unable to delete dir " + curFile + " before rename");
|
|
|
if (!ckptFile.renameTo(curFile)) {
|
|
|
throw new IOException("Unable to rename " + ckptFile +
|
|
|
" to " + curFile);
|
|
@@ -630,6 +704,7 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ return needToSave;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -641,53 +716,74 @@ public class FSImage extends Storage {
|
|
|
*/
|
|
|
boolean loadFSImage() throws IOException {
|
|
|
// Now check all curFiles and see which is the newest
|
|
|
- long latestCheckpointTime = Long.MIN_VALUE;
|
|
|
- StorageDirectory latestSD = null;
|
|
|
+ long latestNameCheckpointTime = Long.MIN_VALUE;
|
|
|
+ long latestEditsCheckpointTime = Long.MIN_VALUE;
|
|
|
+ StorageDirectory latestNameSD = null;
|
|
|
+ StorageDirectory latestEditsSD = null;
|
|
|
boolean needToSave = false;
|
|
|
isUpgradeFinalized = true;
|
|
|
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
- recoverInterruptedCheckpoint(sd);
|
|
|
+ for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
if (!sd.getVersionFile().exists()) {
|
|
|
needToSave |= true;
|
|
|
continue; // some of them might have just been formatted
|
|
|
}
|
|
|
- assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
|
|
|
- "Image file must exist.";
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE))
|
|
|
+ assert getImageFile(sd, NameNodeFile.IMAGE).exists() :
|
|
|
+ "Image file must exist.";
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
|
|
|
+ assert getImageFile(sd, NameNodeFile.EDITS).exists() :
|
|
|
+ "Edits file must exist.";
|
|
|
+
|
|
|
checkpointTime = readCheckpointTime(sd);
|
|
|
if ((checkpointTime != Long.MIN_VALUE) &&
|
|
|
- (checkpointTime != latestCheckpointTime)) {
|
|
|
+ ((checkpointTime != latestNameCheckpointTime) ||
|
|
|
+ (checkpointTime != latestEditsCheckpointTime))) {
|
|
|
// Force saving of new image if checkpoint time
|
|
|
// is not same in all of the storage directories.
|
|
|
needToSave |= true;
|
|
|
}
|
|
|
- if (latestCheckpointTime < checkpointTime) {
|
|
|
- latestCheckpointTime = checkpointTime;
|
|
|
- latestSD = sd;
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) &&
|
|
|
+ (latestNameCheckpointTime < checkpointTime)) {
|
|
|
+ latestNameCheckpointTime = checkpointTime;
|
|
|
+ latestNameSD = sd;
|
|
|
+ }
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) &&
|
|
|
+ (latestEditsCheckpointTime < checkpointTime)) {
|
|
|
+ latestEditsCheckpointTime = checkpointTime;
|
|
|
+ latestEditsSD = sd;
|
|
|
}
|
|
|
if (checkpointTime <= 0L)
|
|
|
needToSave |= true;
|
|
|
// set finalized flag
|
|
|
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
|
|
|
}
|
|
|
- assert latestSD != null : "Latest storage directory was not determined.";
|
|
|
+ assert latestNameSD != null : "Latest image storage directory was " +
|
|
|
+ "not determined.";
|
|
|
+ assert latestEditsSD != null : "Latest edits storage directory was " +
|
|
|
+ "not determined.";
|
|
|
+
|
|
|
+ // Make sure we are loading image and edits from same checkpoint
|
|
|
+ if (latestNameCheckpointTime != latestEditsCheckpointTime)
|
|
|
+ throw new IOException("Inconsitent storage detected, " +
|
|
|
+ "name and edits storage do not match");
|
|
|
+
|
|
|
+ // Recover from previous interrrupted checkpoint if any
|
|
|
+ needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
|
|
|
|
|
|
long startTime = FSNamesystem.now();
|
|
|
- long imageSize = getImageFile(latestSD, NameNodeFile.IMAGE).length();
|
|
|
+ long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
|
|
|
|
|
|
//
|
|
|
// Load in bits
|
|
|
//
|
|
|
- latestSD.read();
|
|
|
- needToSave |= loadFSImage(getImageFile(latestSD, NameNodeFile.IMAGE));
|
|
|
-
|
|
|
+ latestNameSD.read();
|
|
|
+ needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
|
|
|
LOG.info("Image file of size " + imageSize + " loaded in "
|
|
|
+ (FSNamesystem.now() - startTime)/1000 + " seconds.");
|
|
|
- //
|
|
|
- // read in the editlog from the same directory from
|
|
|
- // which we read in the image
|
|
|
- //
|
|
|
- needToSave |= (loadFSEdits(latestSD) > 0);
|
|
|
+
|
|
|
+ // Load latest edits
|
|
|
+ needToSave |= (loadFSEdits(latestEditsSD) > 0);
|
|
|
|
|
|
return needToSave;
|
|
|
}
|
|
@@ -911,13 +1007,18 @@ public class FSImage extends Storage {
|
|
|
*/
|
|
|
public void saveFSImage() throws IOException {
|
|
|
editLog.createNewIfMissing();
|
|
|
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
- saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
|
|
|
- editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
|
|
|
- File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
|
|
|
- if (editsNew.exists())
|
|
|
- editLog.createEditLogFile(editsNew);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
+ NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
|
|
|
+ if (dirType.isOfType(NameNodeDirType.IMAGE))
|
|
|
+ saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
|
|
|
+ if (dirType.isOfType(NameNodeDirType.EDITS)) {
|
|
|
+ editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
|
|
|
+ File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
|
|
|
+ if (editsNew.exists())
|
|
|
+ editLog.createEditLogFile(editsNew);
|
|
|
+ }
|
|
|
}
|
|
|
ckptState = CheckpointStates.UPLOAD_DONE;
|
|
|
rollFSImage();
|
|
@@ -950,13 +1051,16 @@ public class FSImage extends Storage {
|
|
|
sd.clearDirectory(); // create currrent dir
|
|
|
sd.lock();
|
|
|
try {
|
|
|
- saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
|
|
|
- editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
|
|
|
+ NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
|
|
|
+ if (dirType.isOfType(NameNodeDirType.IMAGE))
|
|
|
+ saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
|
|
|
+ if (dirType.isOfType(NameNodeDirType.EDITS))
|
|
|
+ editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
|
|
|
sd.write();
|
|
|
} finally {
|
|
|
sd.unlock();
|
|
|
}
|
|
|
- LOG.info("Storage directory " + sd.root
|
|
|
+ LOG.info("Storage directory " + sd.getRoot()
|
|
|
+ " has been successfully formatted.");
|
|
|
}
|
|
|
|
|
@@ -965,8 +1069,9 @@ public class FSImage extends Storage {
|
|
|
this.namespaceID = newNamespaceID();
|
|
|
this.cTime = 0L;
|
|
|
this.checkpointTime = FSNamesystem.now();
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
format(sd);
|
|
|
}
|
|
|
}
|
|
@@ -1153,8 +1258,9 @@ public class FSImage extends Storage {
|
|
|
if (!editLog.existsNew()) {
|
|
|
throw new IOException("New Edits file does not exist");
|
|
|
}
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
|
|
|
if (!ckpt.exists()) {
|
|
|
throw new IOException("Checkpoint file " + ckpt +
|
|
@@ -1166,8 +1272,9 @@ public class FSImage extends Storage {
|
|
|
//
|
|
|
// Renames new image
|
|
|
//
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
|
|
|
File curFile = getImageFile(sd, NameNodeFile.IMAGE);
|
|
|
// renameTo fails on Windows if the destination file
|
|
@@ -1175,25 +1282,31 @@ public class FSImage extends Storage {
|
|
|
if (!ckpt.renameTo(curFile)) {
|
|
|
curFile.delete();
|
|
|
if (!ckpt.renameTo(curFile)) {
|
|
|
- editLog.processIOError(idx);
|
|
|
- idx--;
|
|
|
+ // Close edit stream, if this directory is also used for edits
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
|
|
|
+ editLog.processIOError(sd);
|
|
|
+ it.remove();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//
|
|
|
- // Updates the fstime file and write version file
|
|
|
+ // Updates the fstime file on all directories (fsimage and edits)
|
|
|
+ // and write version file
|
|
|
//
|
|
|
this.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
this.checkpointTime = FSNamesystem.now();
|
|
|
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
|
|
- StorageDirectory sd = getStorageDir(idx);
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();) {
|
|
|
+ StorageDirectory sd = it.next();
|
|
|
try {
|
|
|
sd.write();
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Cannot write file " + sd.root, e);
|
|
|
- editLog.processIOError(idx);
|
|
|
- idx--;
|
|
|
+ LOG.error("Cannot write file " + sd.getRoot(), e);
|
|
|
+ // Close edit stream, if this directory is also used for edits
|
|
|
+ if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
|
|
|
+ editLog.processIOError(sd);
|
|
|
+ it.remove();
|
|
|
}
|
|
|
}
|
|
|
ckptState = CheckpointStates.START;
|
|
@@ -1244,7 +1357,11 @@ public class FSImage extends Storage {
|
|
|
* Return the name of the image file.
|
|
|
*/
|
|
|
File getFsImageName() {
|
|
|
- return getImageFile(0, NameNodeFile.IMAGE);
|
|
|
+ StorageDirectory sd = null;
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
|
|
|
+ sd = it.next();
|
|
|
+ return getImageFile(sd, NameNodeFile.IMAGE);
|
|
|
}
|
|
|
|
|
|
public File getFsEditName() throws IOException {
|
|
@@ -1252,7 +1369,12 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
|
|
|
File getFsTimeName() {
|
|
|
- return getImageFile(0, NameNodeFile.TIME);
|
|
|
+ StorageDirectory sd = null;
|
|
|
+ // NameNodeFile.TIME shoul be same on all directories
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(); it.hasNext();)
|
|
|
+ sd = it.next();
|
|
|
+ return getImageFile(sd, NameNodeFile.TIME);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1260,11 +1382,12 @@ public class FSImage extends Storage {
|
|
|
* checkpointing.
|
|
|
*/
|
|
|
File[] getFsImageNameCheckpoint() {
|
|
|
- File[] list = new File[getNumStorageDirs()];
|
|
|
- for(int i = 0; i < getNumStorageDirs(); i++) {
|
|
|
- list[i] = getImageFile(getStorageDir(i), NameNodeFile.IMAGE_NEW);
|
|
|
+ ArrayList<File> list = new ArrayList<File>();
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
|
|
+ list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW));
|
|
|
}
|
|
|
- return list;
|
|
|
+ return list.toArray(new File[list.size()]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1383,6 +1506,20 @@ public class FSImage extends Storage {
|
|
|
}
|
|
|
return dirs;
|
|
|
}
|
|
|
+
|
|
|
+ static Collection<File> getCheckpointEditsDirs(Configuration conf,
|
|
|
+ String defaultName) {
|
|
|
+ Collection<String> dirNames =
|
|
|
+ conf.getStringCollection("fs.checkpoint.edits.dir");
|
|
|
+ if (dirNames.size() == 0 && defaultName != null) {
|
|
|
+ dirNames.add(defaultName);
|
|
|
+ }
|
|
|
+ Collection<File> dirs = new ArrayList<File>(dirNames.size());
|
|
|
+ for(String name : dirNames) {
|
|
|
+ dirs.add(new File(name));
|
|
|
+ }
|
|
|
+ return dirs;
|
|
|
+ }
|
|
|
|
|
|
static private final UTF8 U_STR = new UTF8();
|
|
|
static String readString(DataInputStream in) throws IOException {
|