Bladeren bron

HDFS-1735. Merging r1066305 into federation from trunk. This requires merging change in federation tto FSImage+NNStorage refactoring in trunk.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1079607 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 jaren geleden
bovenliggende
commit
21df4f8d3a
32 gewijzigde bestanden met toevoegingen van 1575 en 868 verwijderingen
  1. 5 0
      CHANGES.txt
  2. 9 3
      src/java/org/apache/hadoop/hdfs/server/common/Storage.java
  3. 52 40
      src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  4. 10 10
      src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
  5. 17 14
      src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
  6. 13 12
      src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
  7. 2 2
      src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  8. 122 82
      src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  9. 116 543
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  10. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  11. 29 26
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  12. 7 5
      src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
  13. 1033 0
      src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
  14. 6 6
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  15. 4 3
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
  16. 37 28
      src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  17. 3 3
      src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java
  18. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
  19. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
  20. 6 5
      src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
  21. 2 3
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
  22. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
  23. 5 3
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
  24. 25 24
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
  25. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
  26. 4 4
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  27. 4 4
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
  28. 4 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
  29. 27 13
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
  30. 4 4
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
  31. 11 11
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
  32. 11 11
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

+ 5 - 0
CHANGES.txt

@@ -222,6 +222,9 @@ Trunk (unreleased changes)
     HDFS-1448. Add a new tool Offline Edits Viewer (oev).  (Erik Steffl
     via szetszwo)
 
+    HDFS-1735. Federation: merge FSImage change in federation to
+    FSImage+NNStorage refactoring in trunk (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -255,6 +258,8 @@ Trunk (unreleased changes)
 
     HDFS-1335. HDFS side change of HADDOP-6904: RPC compatibility. (hairong)
 
+    HDFS-1557. Separate Storage from FSImage. (Ivan Kelly via jitendra)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

+ 9 - 3
src/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -65,7 +65,7 @@ public abstract class Storage extends StorageInfo {
   // Constants
   
   // last layout version that did not support upgrades
-  protected static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
+  public static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
   
   // this corresponds to Hadoop-0.14.
   public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -7;
@@ -729,7 +729,7 @@ public abstract class Storage extends StorageInfo {
    * 
    * @param oldVersion
    */
-  protected static void checkVersionUpgradable(int oldVersion) 
+  public static void checkVersionUpgradable(int oldVersion) 
                                      throws IOException {
     if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
       String msg = "*********** Upgrade is not supported from this " +
@@ -791,7 +791,13 @@ public abstract class Storage extends StorageInfo {
                             + from.getCanonicalPath() + " to " + to.getCanonicalPath());
   }
 
-  protected static void deleteDir(File dir) throws IOException {
+  /**
+   * Recursively delete all the content of the directory first and then 
+   * the directory itself from the local filesystem.
+   * @param dir The directory to delete
+   * @throws IOException
+   */
+  public static void deleteDir(File dir) throws IOException {
     if (!FileUtil.fullyDelete(dir))
       throw new IOException("Failed to delete " + dir.getCanonicalPath());
   }

+ 52 - 40
src/java/org/apache/hadoop/hdfs/server/namenode/BackupStorage.java → src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -26,20 +26,29 @@ import java.util.Iterator;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.LongWritable;
 
+/**
+ * Extension of FSImage for the backup node.
+ * This class handles the setup of the journaling 
+ * spool on the backup namenode.
+ */
 @InterfaceAudience.Private
-public class BackupStorage extends FSImage {
+public class BackupImage extends FSImage {
   // Names of the journal spool directory and the spool file
   private static final String STORAGE_JSPOOL_DIR = "jspool";
-  private static final String STORAGE_JSPOOL_FILE = 
-                                              NameNodeFile.EDITS_NEW.getName();
+  private static final String STORAGE_JSPOOL_FILE =
+    NNStorage.NameNodeFile.EDITS_NEW.getName();
 
   /** Backup input stream for loading edits into memory */
   private EditLogBackupInputStream backupInputStream;
@@ -55,31 +64,27 @@ public class BackupStorage extends FSImage {
 
   /**
    */
-  BackupStorage() {
+  BackupImage() {
     super();
+    storage.setDisablePreUpgradableLayoutCheck(true);
     jsState = JSpoolState.OFF;
   }
 
-  @Override
-  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
-    return false;
-  }
-
   /**
    * Analyze backup storage directories for consistency.<br>
    * Recover from incomplete checkpoints if required.<br>
    * Read VERSION and fstime files if exist.<br>
    * Do not load image or edits.
-   * 
+   *
    * @param imageDirs list of image directories as URI.
    * @param editsDirs list of edits directories URI.
    * @throws IOException if the node should shutdown.
    */
   void recoverCreateRead(Collection<URI> imageDirs,
                          Collection<URI> editsDirs) throws IOException {
-    setStorageDirectories(imageDirs, editsDirs);
-    this.checkpointTime = 0L;
-    for(Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+    storage.setStorageDirectories(imageDirs, editsDirs);
+    storage.setCheckpointTime(0L);
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState;
       try {
@@ -87,7 +92,7 @@ public class BackupStorage extends FSImage {
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
-          // fail if any of the configured storage dirs are inaccessible 
+          // fail if any of the configured storage dirs are inaccessible
           throw new InconsistentFSStateException(sd.getRoot(),
                 "checkpoint directory does not exist or is not accessible.");
         case NOT_FORMATTED:
@@ -125,13 +130,15 @@ public class BackupStorage extends FSImage {
     fsDir.reset();
 
     // unlock, close and rename storage directories
-    unlockAll();
+    storage.unlockAll();
     // recover from unsuccessful checkpoint if necessary
-    recoverCreateRead(getImageDirectories(), getEditsDirectories());
+    recoverCreateRead(storage.getImageDirectories(),
+                      storage.getEditsDirectories());
     // rename and recreate
-    for(StorageDirectory sd : storageDirs) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
       // rename current to lastcheckpoint.tmp
-      moveCurrent(sd);
+      storage.moveCurrent(sd);
     }
   }
 
@@ -148,15 +155,17 @@ public class BackupStorage extends FSImage {
 
     FSDirectory fsDir = getFSNamesystem().dir;
     if(fsDir.isEmpty()) {
-      Iterator<StorageDirectory> itImage = dirIterator(NameNodeDirType.IMAGE);
-      Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+      Iterator<StorageDirectory> itImage
+        = storage.dirIterator(NameNodeDirType.IMAGE);
+      Iterator<StorageDirectory> itEdits
+        = storage.dirIterator(NameNodeDirType.EDITS);
       if(!itImage.hasNext() || ! itEdits.hasNext())
         throw new IOException("Could not locate checkpoint directories");
       StorageDirectory sdName = itImage.next();
       StorageDirectory sdEdits = itEdits.next();
       getFSDirectoryRootLock().writeLock();
       try { // load image under rootDir lock
-        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+        loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
       } finally {
         getFSDirectoryRootLock().writeUnlock();
       }
@@ -164,9 +173,9 @@ public class BackupStorage extends FSImage {
     }
 
     // set storage fields
-    setStorageInfo(sig);
-    imageDigest = sig.imageDigest;
-    checkpointTime = sig.checkpointTime;
+    storage.setStorageInfo(sig);
+    storage.setImageDigest(sig.imageDigest);
+    storage.setCheckpointTime(sig.checkpointTime);
   }
 
   /**
@@ -193,9 +202,9 @@ public class BackupStorage extends FSImage {
    * Journal writer journals new meta-data state.
    * <ol>
    * <li> If Journal Spool state is OFF then journal records (edits)
-   * are applied directly to meta-data state in memory and are written 
+   * are applied directly to meta-data state in memory and are written
    * to the edits file(s).</li>
-   * <li> If Journal Spool state is INPROGRESS then records are only 
+   * <li> If Journal Spool state is INPROGRESS then records are only
    * written to edits.new file, which is called Spooling.</li>
    * <li> Journal Spool state WAIT blocks journaling until the
    * Journal Spool reader finalizes merging of the spooled data and
@@ -217,7 +226,7 @@ public class BackupStorage extends FSImage {
           // update NameSpace in memory
           backupInputStream.setBytes(data);
           FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          logLoader.loadEditRecords(getLayoutVersion(),
+          logLoader.loadEditRecords(storage.getLayoutVersion(),
                     backupInputStream.getDataInputStream(), true);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
@@ -245,7 +254,7 @@ public class BackupStorage extends FSImage {
   /**
    * Start journal spool.
    * Switch to writing into edits.new instead of edits.
-   * 
+   *
    * edits.new for spooling is in separate directory "spool" rather than in
    * "current" because the two directories should be independent.
    * While spooling a checkpoint can happen and current will first
@@ -264,8 +273,8 @@ public class BackupStorage extends FSImage {
     }
 
     // create journal spool directories
-    for(Iterator<StorageDirectory> it = 
-                          dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    for (Iterator<StorageDirectory> it
+           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
       File jsDir = getJSpoolDir(sd);
       if (!jsDir.exists() && !jsDir.mkdirs()) {
@@ -273,7 +282,7 @@ public class BackupStorage extends FSImage {
                               + jsDir.getCanonicalPath());
       }
       // create edit file if missing
-      File eFile = getEditFile(sd);
+      File eFile = storage.getEditFile(sd);
       if(!eFile.exists()) {
         editLog.createEditLogFile(eFile);
       }
@@ -304,7 +313,7 @@ public class BackupStorage extends FSImage {
       assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
       LongWritable lw = new LongWritable();
       lw.readFields(in);
-      setCheckpointTime(lw.get());
+      storage.setCheckpointTimeInStorage(lw.get());
     } finally {
       backupInputStream.clear();
     }
@@ -319,14 +328,15 @@ public class BackupStorage extends FSImage {
    * <ul>
    * <li> reads remaining journal records if any,</li>
    * <li> renames edits.new to edits,</li>
-   * <li> sets {@link JSpoolState} to OFF,</li> 
+   * <li> sets {@link JSpoolState} to OFF,</li>
    * <li> and notifies the journaling thread.</li>
    * </ul>
    * Journaling resumes with applying new journal records to the memory state,
    * and writing them into edits file(s).
    */
   void convergeJournalSpool() throws IOException {
-    Iterator<StorageDirectory> itEdits = dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> itEdits
+      = storage.dirIterator(NameNodeDirType.EDITS);
     if(! itEdits.hasNext())
       throw new IOException("Could not locate checkpoint directories");
     StorageDirectory sdEdits = itEdits.next();
@@ -342,13 +352,14 @@ public class BackupStorage extends FSImage {
 
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      numEdits += logLoader.loadEditRecords(getLayoutVersion(), in, true);
+      numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
+                                            in, true);
       getFSNamesystem().dir.updateCountForINodeWithQuota();
       edits.close();
     }
 
-    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath() 
-        + " of size " + jSpoolFile.length() + " edits # " + numEdits 
+    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
+        + " of size " + jSpoolFile.length() + " edits # " + numEdits
         + " loaded in " + (now()-startTime)/1000 + " seconds.");
 
     // rename spool edits.new to edits making it in sync with the active node
@@ -356,7 +367,7 @@ public class BackupStorage extends FSImage {
     editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
 
     // write version file
-    resetVersion(false, imageDigest);
+    resetVersion(false, storage.getImageDigest());
 
     // wake up journal writer
     synchronized(this) {
@@ -365,8 +376,9 @@ public class BackupStorage extends FSImage {
     }
 
     // Rename lastcheckpoint.tmp to previous.checkpoint
-    for(StorageDirectory sd : storageDirs) {
-      moveLastCheckpoint(sd);
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      storage.moveLastCheckpoint(sd);
     }
   }
 }

+ 10 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -128,7 +128,7 @@ public class BackupNode extends NameNode {
 
   @Override // NameNode
   protected void loadNamesystem(Configuration conf) throws IOException {
-    BackupStorage bnImage = new BackupStorage();
+    BackupImage bnImage = new BackupImage();
     this.namesystem = new FSNamesystem(conf, bnImage);
     bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
                               FSNamesystem.getNamespaceEditsDirs(conf));
@@ -224,7 +224,7 @@ public class BackupNode extends NameNode {
     if(!nnRpcAddress.equals(nnReg.getAddress()))
       throw new IOException("Journal request from unexpected name-node: "
           + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    BackupStorage bnImage = (BackupStorage)getFSImage();
+    BackupImage bnImage = (BackupImage)getFSImage();
     switch(jAction) {
       case (int)JA_IS_ALIVE:
         return;
@@ -246,8 +246,8 @@ public class BackupNode extends NameNode {
   boolean shouldCheckpointAtStartup() {
     FSImage fsImage = getFSImage();
     if(isRole(NamenodeRole.CHECKPOINT)) {
-      assert fsImage.getNumStorageDirs() > 0;
-      return ! fsImage.getStorageDir(0).getVersionFile().exists();
+      assert fsImage.getStorage().getNumStorageDirs() > 0;
+      return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
     }
     if(namesystem == null || namesystem.dir == null || getFSImage() == null)
       return true;
@@ -309,14 +309,14 @@ public class BackupNode extends NameNode {
    * @throws IOException
    */
   private void registerWith(NamespaceInfo nsInfo) throws IOException {
-    BackupStorage bnImage = (BackupStorage)getFSImage();
+    BackupImage bnImage = (BackupImage)getFSImage();
     // verify namespaceID
-    if(bnImage.getNamespaceID() == 0) // new backup storage
-      bnImage.setStorageInfo(nsInfo);
-    else if(bnImage.getNamespaceID() != nsInfo.getNamespaceID())
+    if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
+      bnImage.getStorage().setStorageInfo(nsInfo);
+    else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
       throw new IOException("Incompatible namespaceIDs"
           + ": active node namespaceID = " + nsInfo.getNamespaceID() 
-          + "; backup node namespaceID = " + bnImage.getNamespaceID());
+          + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
 
     setRegistration();
     NamenodeRegistration nnReg = null;
@@ -351,7 +351,7 @@ public class BackupNode extends NameNode {
    * @throws IOException
    */
   void resetNamespace() throws IOException {
-    ((BackupStorage)getFSImage()).reset();
+    ((BackupImage)getFSImage()).reset();
   }
 
   /**

+ 17 - 14
src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java

@@ -43,12 +43,12 @@ public class CheckpointSignature extends StorageInfo
   public CheckpointSignature() {}
 
   CheckpointSignature(FSImage fsImage) {
-    super(fsImage);
+    super(fsImage.getStorage());
     blockpoolID = fsImage.getBlockPoolID();
     editsTime = fsImage.getEditLog().getFsEditTime();
-    checkpointTime = fsImage.getCheckpointTime();
-    imageDigest = fsImage.imageDigest;
-    checkpointTime = fsImage.getCheckpointTime();
+    checkpointTime = fsImage.getStorage().getCheckpointTime();
+    imageDigest = fsImage.getStorage().getImageDigest();
+    checkpointTime = fsImage.getStorage().getCheckpointTime();
   }
 
   CheckpointSignature(String str) {
@@ -109,12 +109,13 @@ public class CheckpointSignature extends StorageInfo
   }
 
   void validateStorageInfo(FSImage si) throws IOException {
-    if(layoutVersion != si.layoutVersion
-        || namespaceID != si.namespaceID || cTime != si.cTime
-        || checkpointTime != si.checkpointTime 
-        || !imageDigest.equals(si.imageDigest)
-        || !clusterID.equals(si.clusterID)
-        || !blockpoolID.equals(si.blockpoolID)) {
+    if(layoutVersion != si.getLayoutVersion()
+        || namespaceID != si.getNamespaceID() 
+        || cTime != si.getStorage().cTime
+        || checkpointTime != si.getStorage().getCheckpointTime() 
+        || !imageDigest.equals(si.getStorage().imageDigest)
+        || !clusterID.equals(si.getClusterID())
+        || !blockpoolID.equals(si.getBlockPoolID())) {
       // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
@@ -123,10 +124,12 @@ public class CheckpointSignature extends StorageInfo
           + " ; clusterId = " + clusterID
           + " ; blockpoolId = " + blockpoolID
           + ".\nExpecting respectively: "
-          + si.layoutVersion + "; " + si.namespaceID + "; " + si.cTime
-          + "; " + si.checkpointTime + "; " + si.imageDigest
-          + "; " + si.clusterID + "; " + si.blockpoolID + "."
-          );
+          + si.getLayoutVersion() + "; " 
+          + si.getNamespaceID() + "; " + si.getStorage().cTime
+          + "; " + si.getStorage().getCheckpointTime() + "; " 
+          + si.getStorage().imageDigest
+          + "; " + si.getClusterID() + "; " 
+          + si.getBlockPoolID() + ".");
     }
   }
 

+ 13 - 12
src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -63,8 +63,8 @@ class Checkpointer extends Daemon {
 
   private String infoBindAddress;
 
-  private BackupStorage getFSImage() {
-    return (BackupStorage)backupNode.getFSImage();
+  private BackupImage getFSImage() {
+    return (BackupImage)backupNode.getFSImage();
   }
 
   private NamenodeProtocol getNamenode(){
@@ -182,8 +182,8 @@ class Checkpointer extends Daemon {
   private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
     // Retrieve image file
     String fileid = "getimage=1";
-    Collection<File> list = getFSImage().getFiles(NameNodeFile.IMAGE,
-        NameNodeDirType.IMAGE);
+    Collection<File> list = getFSImage()
+      .getStorage().getFiles(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
     File[] files = list.toArray(new File[list.size()]);
     assert files.length > 0 : "No checkpoint targets.";
     String nnHttpAddr = backupNode.nnHttpAddress;
@@ -193,7 +193,8 @@ class Checkpointer extends Daemon {
 
     // Retrieve edits file
     fileid = "getedit=1";
-    list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+    list = getFSImage()
+      .getStorage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
     files = list.toArray(new File[list.size()]);
     assert files.length > 0 : "No checkpoint targets.";
     TransferFsImage.getFileClient(nnHttpAddr, fileid, files, false);
@@ -211,7 +212,7 @@ class Checkpointer extends Daemon {
     String fileid = "putimage=1&port=" + httpPort +
       "&machine=" + infoBindAddress +
       "&token=" + sig.toString() +
-      "&newChecksum=" + getFSImage().imageDigest.toString();
+      "&newChecksum=" + getFSImage().getStorage().getImageDigest().toString();
     LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
     TransferFsImage.getFileClient(backupNode.nnHttpAddress, 
         fileid, (File[])null, false);
@@ -250,9 +251,9 @@ class Checkpointer extends Daemon {
       downloadCheckpoint(sig);
     }
 
-    BackupStorage bnImage = getFSImage();
-    bnImage.blockpoolID = backupNode.getBlockPoolId();
-    bnImage.clusterID = backupNode.getClusterId();
+    BackupImage bnImage = getFSImage();
+    bnImage.getStorage().setClusterID(backupNode.getBlockPoolId());
+    bnImage.getStorage().setClusterID(backupNode.getClusterId());
     bnImage.loadCheckpoint(sig);
     sig.validateStorageInfo(bnImage);
     bnImage.saveCheckpoint();
@@ -268,6 +269,6 @@ class Checkpointer extends Daemon {
         getFSImage().getEditLog().close();
     LOG.info("Checkpoint completed in "
         + (now() - startTime)/1000 + " seconds."
-        + " New Image Size: " + bnImage.getFsImageName().length());
+        + " New Image Size: " + bnImage.getStorage().getFsImageName().length());
   }
 }

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -141,8 +141,8 @@ class FSDirectory implements Closeable {
       throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
-      fsImage.setStorageDirectories(dataDirs, editsDirs);
-      fsImage.format(fsImage.determineClusterId()); // reuse current id
+      fsImage.getStorage().setStorageDirectories(dataDirs, editsDirs);
+      fsImage.getStorage().format(fsImage.getStorage().determineClusterId()); // reuse current id
       startOpt = StartupOption.REGULAR;
     }
     try {

+ 122 - 82
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -36,11 +36,11 @@ import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
 import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -56,7 +56,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSEditLog {
+public class FSEditLog implements NNStorageListener {
 
   static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
       " File system changes are not persistent. No journal streams.";
@@ -66,7 +66,6 @@ public class FSEditLog {
   private volatile int sizeOutputFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
-  private FSImage fsimage = null;
 
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
@@ -89,6 +88,8 @@ public class FSEditLog {
   private long totalTimeTransactions;  // total time for all transactions
   private NameNodeMetrics metrics;
 
+  private NNStorage storage;
+
   private static class TransactionId {
     public long txid;
 
@@ -104,23 +105,24 @@ public class FSEditLog {
     }
   };
 
-  FSEditLog(FSImage image) {
-    fsimage = image;
+  FSEditLog(NNStorage storage) {
     isSyncRunning = false;
+    this.storage = storage;
+    this.storage.registerListener(this);
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
   }
   
   private File getEditFile(StorageDirectory sd) {
-    return fsimage.getEditFile(sd);
+    return storage.getEditFile(sd);
   }
   
   private File getEditNewFile(StorageDirectory sd) {
-    return fsimage.getEditNewFile(sd);
+    return storage.getEditNewFile(sd);
   }
   
   private int getNumEditsDirs() {
-   return fsimage.getNumStorageDirs(NameNodeDirType.EDITS);
+   return storage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
 
   synchronized int getNumEditStreams() {
@@ -151,8 +153,8 @@ public class FSEditLog {
       editStreams = new ArrayList<EditLogOutputStream>();
     
     ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    for (Iterator<StorageDirectory> it 
+         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
       File eFile = getEditFile(sd);
       try {
@@ -166,7 +168,7 @@ public class FSEditLog {
       }
     }
     
-    if(al != null) fsimage.processIOError(al, false);
+    if(al != null) storage.reportErrorsOnDirectories(al);
   }
   
   
@@ -210,7 +212,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     editStreams.clear();
   }
 
@@ -228,64 +230,26 @@ public class FSEditLog {
 
   /**
    * The specified streams have IO errors. Close and remove them.
-   * If propagate is true - close related StorageDirectories.
-   * (is called with propagate value true from everywhere
-   *  except fsimage.processIOError)
    */
-  synchronized void processIOError(
-      List<EditLogOutputStream> errorStreams,
-      boolean propagate) {
-    
+  synchronized
+  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
     if (errorStreams == null || errorStreams.size() == 0) {
       return;                       // nothing to do
     }
-
-    String lsd = fsimage.listStorageDirectories();
-    LOG.info("current list of storage dirs:" + lsd);
-
-    ArrayList<StorageDirectory> al = null;
-    for (EditLogOutputStream eStream : errorStreams) {
-      LOG.error("Unable to log edits to " + eStream.getName()
-          + "; removing it");     
-
-      StorageDirectory storageDir;
-      if(propagate && eStream.getType() == JournalType.FILE && //find SD
-          (storageDir = getStorage(eStream)) != null) {
-        LOG.info("about to remove corresponding storage:" 
-            + storageDir.getRoot().getAbsolutePath());
-        // remove corresponding storage dir
-        if(al == null) al = new ArrayList<StorageDirectory>(1);
-        al.add(storageDir);
+    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+    for (EditLogOutputStream e : errorStreams) {
+      if (e.getType() == JournalType.FILE) {
+        errorDirs.add(getStorageDirectoryForStream(e));
+      } else {
+        disableStream(e);
       }
-      Iterator<EditLogOutputStream> ies = editStreams.iterator();
-      while (ies.hasNext()) {
-        EditLogOutputStream es = ies.next();
-        if (es == eStream) {  
-          try { eStream.close(); } catch (IOException e) {
-            // nothing to do.
-            LOG.warn("Failed to close eStream " + eStream.getName()
-                + " before removing it (might be ok)");
-          }
-          ies.remove();
-          break;
-        }
-      } 
-    }
-    
-    if (editStreams == null || editStreams.size() <= 0) {
-      String msg = "Fatal Error: All storage directories are inaccessible.";
-      LOG.fatal(msg, new IOException(msg)); 
-      Runtime.getRuntime().exit(-1);
     }
 
-    // removed failed SDs
-    if(propagate && al != null) fsimage.processIOError(al, false);
-    
-    //for the rest of the streams
-    if(propagate) incrementCheckpointTime();
-    
-    lsd = fsimage.listStorageDirectories();
-    LOG.info("at the end current list of storage dirs:" + lsd);
+    try {
+      storage.reportErrorsOnDirectories(errorDirs);
+    } catch (IOException ioe) {
+      LOG.error("Problem erroring streams " + ioe);
+    }
   }
 
 
@@ -298,7 +262,7 @@ public class FSEditLog {
     String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
     .getParentFile().getParentFile().getAbsolutePath();
 
-    Iterator<StorageDirectory> it = fsimage.dirIterator(); 
+    Iterator<StorageDirectory> it = storage.dirIterator(); 
     while (it.hasNext()) {
       StorageDirectory sd = it.next();
       LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath()); 
@@ -355,7 +319,7 @@ public class FSEditLog {
           errorStreams.add(eStream);
         }
       }
-      processIOError(errorStreams, true);
+      disableAndReportErrorOnStreams(errorStreams);
       recordTransaction(start);
       
       // check if it is time to schedule an automatic sync
@@ -547,7 +511,7 @@ public class FSEditLog {
         }
       }
       long elapsed = now() - start;
-      processIOError(errorStreams, true);
+      disableAndReportErrorOnStreams(errorStreams);
   
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
@@ -814,7 +778,7 @@ public class FSEditLog {
         al.add(es);
       }
     }
-    if(al!=null) processIOError(al, true);
+    if(al!=null) disableAndReportErrorOnStreams(al);
     return size;
   }
   
@@ -823,7 +787,7 @@ public class FSEditLog {
    */
   synchronized void rollEditLog() throws IOException {
     waitForSyncToFinish();
-    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(!it.hasNext()) 
       return;
     //
@@ -841,7 +805,7 @@ public class FSEditLog {
       return; // nothing to do, edits.new exists!
 
     // check if any of failed storage is now available and put it back
-    fsimage.attemptRestoreRemovedStorage(false);
+    storage.attemptRestoreRemovedStorage(false);
 
     divertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
@@ -863,7 +827,7 @@ public class FSEditLog {
     EditStreamIterator itE = 
       (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
     Iterator<StorageDirectory> itD = 
-      fsimage.dirIterator(NameNodeDirType.EDITS);
+      storage.dirIterator(NameNodeDirType.EDITS);
     while(itE.hasNext() && itD.hasNext()) {
       EditLogOutputStream eStream = itE.next();
       StorageDirectory sd = itD.next();
@@ -885,7 +849,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   /**
@@ -929,7 +893,7 @@ public class FSEditLog {
     EditStreamIterator itE = 
       (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
     Iterator<StorageDirectory> itD = 
-      fsimage.dirIterator(NameNodeDirType.EDITS);
+      storage.dirIterator(NameNodeDirType.EDITS);
     while(itE.hasNext() && itD.hasNext()) {
       EditLogOutputStream eStream = itE.next();
       StorageDirectory sd = itD.next();
@@ -964,7 +928,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   /**
@@ -973,7 +937,7 @@ public class FSEditLog {
   synchronized File getFsEditName() {
     StorageDirectory sd = null;   
     for (Iterator<StorageDirectory> it = 
-      fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       sd = it.next();   
       if(sd.getRoot().canRead())
         return getEditFile(sd);
@@ -985,7 +949,7 @@ public class FSEditLog {
    * Returns the timestamp of the edit log
    */
   synchronized long getFsEditTime() {
-    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(it.hasNext())
       return getEditFile(it.next()).lastModified();
     return 0;
@@ -1052,7 +1016,7 @@ public class FSEditLog {
         errorStreams.add(eStream);
       }
     }
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     recordTransaction(start);
   }
 
@@ -1126,8 +1090,8 @@ public class FSEditLog {
   }
 
   void incrementCheckpointTime() {
-    fsimage.incrementCheckpointTime();
-    Writable[] args = {new LongWritable(fsimage.getCheckpointTime())};
+    storage.incrementCheckpointTime();
+    Writable[] args = {new LongWritable(storage.getCheckpointTime())};
     logEdit(OP_CHECKPOINT_TIME, args);
   }
 
@@ -1148,7 +1112,7 @@ public class FSEditLog {
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   synchronized boolean checkBackupRegistration(
@@ -1175,7 +1139,7 @@ public class FSEditLog {
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    processIOError(errorStreams, true);
+    disableAndReportErrorOnStreams(errorStreams);
     return regAllowed;
   }
   
@@ -1186,4 +1150,80 @@ public class FSEditLog {
     }
     return new BytesWritable(bytes);
   }
+
+  /**
+   * Get the StorageDirectory for a stream
+   * @param es Stream whose StorageDirectory we wish to know
+   * @return the matching StorageDirectory
+   */
+  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
+    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
+
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
+                            + " and " + sd.getRoot().getAbsolutePath()); 
+      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+        return sd;
+    }
+    return null;
+  }
+
+  private synchronized void disableStream(EditLogOutputStream stream) {
+    try { stream.close(); } catch (IOException e) {
+      // nothing to do.
+      LOG.warn("Failed to close eStream " + stream.getName()
+               + " before removing it (might be ok)");
+    }
+    editStreams.remove(stream);
+
+    if (editStreams.size() <= 0) {
+      String msg = "Fatal Error: All storage directories are inaccessible.";
+      LOG.fatal(msg, new IOException(msg));
+      Runtime.getRuntime().exit(-1);
+    }
+  }
+
+  /**
+   * Error Handling on a storageDirectory
+   *
+   */
+  // NNStorageListener Interface
+  @Override // NNStorageListener
+  public synchronized void errorOccurred(StorageDirectory sd)
+      throws IOException {
+    ArrayList<EditLogOutputStream> errorStreams
+      = new ArrayList<EditLogOutputStream>();
+
+    for (EditLogOutputStream eStream : editStreams) {
+      LOG.error("Unable to log edits to " + eStream.getName()
+                + "; removing it");
+
+      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
+      if (sd == streamStorageDir) {
+        errorStreams.add(eStream);
+      }
+    }
+
+    for (EditLogOutputStream eStream : errorStreams) {
+      disableStream(eStream);
+    }
+  }
+
+  @Override // NNStorageListener
+  public synchronized void formatOccurred(StorageDirectory sd)
+      throws IOException {
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      createEditLogFile(NNStorage.getStorageFile(sd, NameNodeFile.EDITS));
+    }
+  };
+
+  @Override // NNStorageListener
+  public synchronized void directoryAvailable(StorageDirectory sd)
+      throws IOException {
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      File eFile = getEditFile(sd);
+      addNewEditLogStream(eFile);
+    }
+  }
 }

File diff suppressed because it is too large
+ 116 - 543
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java


+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -472,7 +472,7 @@ class FSImageFormat {
       DataOutputStream out = new DataOutputStream(fos);
       try {
         out.writeInt(FSConstants.LAYOUT_VERSION);
-        out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency
+        out.writeInt(sourceNamesystem.getFSImage().getStorage().getNamespaceID()); // TODO bad dependency
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
 

+ 29 - 26
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -447,11 +447,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * Should do everything that would be done for the NameNode,
    * except for loading the image.
    * 
-   * @param bnImage {@link BackupStorage}
+   * @param bnImage {@link BackupImage}
    * @param conf configuration
    * @throws IOException
    */
-  FSNamesystem(Configuration conf, BackupStorage bnImage) throws IOException {
+  FSNamesystem(Configuration conf, BackupImage bnImage) throws IOException {
     try {
       initialize(conf, bnImage);
     } catch(IOException e) {
@@ -538,12 +538,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   }
   
   NamespaceInfo getNamespaceInfo() {
-    NamespaceInfo nsinfo = new NamespaceInfo(dir.fsImage.getNamespaceID(),
+    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
                              getClusterId(),
                              getBlockPoolId(),
-                             dir.fsImage.getCTime(),
+                             dir.fsImage.getStorage().getCTime(),
                              getDistributedUpgradeVersion());
-    return nsinfo;
   }
 
   /**
@@ -2691,7 +2690,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @return registration ID
    */
   public String getRegistrationID() {
-    return Storage.getRegistrationID(dir.fsImage);
+    return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
     
   /**
@@ -3481,10 +3480,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     
     // if it is disabled - enable it and vice versa.
     if(arg.equals("check"))
-      return getFSImage().getRestoreFailedStorage();
+      return getFSImage().getStorage().getRestoreFailedStorage();
     
     boolean val = arg.equals("true");  // false if not
-    getFSImage().setRestoreFailedStorage(val);
+    getFSImage().getStorage().setRestoreFailedStorage(val);
     
     return val;
     } finally {
@@ -4870,18 +4869,20 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @throws IOException
    */
   void registerBackupNode(NamenodeRegistration registration)
-  throws IOException {
+    throws IOException {
     writeLock();
     try {
-    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs: " 
-          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
-          + "; " + registration.getRole() +
-              " node namespaceID = " + registration.getNamespaceID());
-    boolean regAllowed = getEditLog().checkBackupRegistration(registration);
-    if(!regAllowed)
-      throw new IOException("Registration is not allowed. " +
-          "Another node is registered as a backup.");
+      if(getFSImage().getStorage().getNamespaceID() 
+         != registration.getNamespaceID())
+        throw new IOException("Incompatible namespaceIDs: "
+            + " Namenode namespaceID = "
+            + getFSImage().getStorage().getNamespaceID() + "; "
+            + registration.getRole() +
+            " node namespaceID = " + registration.getNamespaceID());
+      boolean regAllowed = getEditLog().checkBackupRegistration(registration);
+      if(!regAllowed)
+        throw new IOException("Registration is not allowed. " +
+                              "Another node is registered as a backup.");
     } finally {
       writeUnlock();
     }
@@ -4895,15 +4896,17 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @throws IOException
    */
   void releaseBackupNode(NamenodeRegistration registration)
-  throws IOException {
+    throws IOException {
     writeLock();
     try {
-    if(getFSImage().getNamespaceID() != registration.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs: " 
-          + " Namenode namespaceID = " + getFSImage().getNamespaceID() 
-          + "; " + registration.getRole() +
-              " node namespaceID = " + registration.getNamespaceID());
-    getEditLog().releaseBackupStream(registration);
+      if(getFSImage().getStorage().getNamespaceID()
+         != registration.getNamespaceID())
+        throw new IOException("Incompatible namespaceIDs: "
+            + " Namenode namespaceID = "
+            + getFSImage().getStorage().getNamespaceID() + "; "
+            + registration.getRole() +
+            " node namespaceID = " + registration.getNamespaceID());
+      getEditLog().releaseBackupStream(registration);
     } finally {
       writeUnlock();
     }
@@ -5418,7 +5421,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
 
   @Override  // NameNodeMXBean
   public String getClusterId() {
-    return dir.fsImage.getClusterID();
+    return dir.fsImage.getStorage().getClusterID();
   }
   
   @Override  // NameNodeMXBean

+ 7 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java

@@ -76,17 +76,19 @@ public class GetImageServlet extends HttpServlet {
         public Void run() throws Exception {
           if (ff.getImage()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                String.valueOf(nnImage.getFsImageName().length()));
+                               String.valueOf(nnImage.getStorage()
+                                              .getFsImageName().length()));
             // send fsImage
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsImageName(),
+                                          nnImage.getStorage().getFsImageName(),
                 getThrottler(conf)); 
           } else if (ff.getEdit()) {
             response.setHeader(TransferFsImage.CONTENT_LENGTH,
-                String.valueOf(nnImage.getFsEditName().length()));
+                               String.valueOf(nnImage.getStorage()
+                                              .getFsEditName().length()));
             // send edits
             TransferFsImage.getFileServer(response.getOutputStream(),
-                nnImage.getFsEditName(),
+                                          nnImage.getStorage().getFsEditName(),
                 getThrottler(conf));
           } else if (ff.putImage()) {
             // issue a HTTP get request to download the new fsimage 
@@ -98,7 +100,7 @@ public class GetImageServlet extends HttpServlet {
                 public MD5Hash run() throws Exception {
                   return TransferFsImage.getFileClient(
                       ff.getInfoServer(), "getimage=1", 
-                      nnImage.getFsImageNameCheckpoint(), true);
+                      nnImage.getStorage().getFsImageNameCheckpoint(), true);
                 }
             });
             if (!nnImage.newImageDigest.equals(downloadImageDigest)) {

+ 1033 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -0,0 +1,1033 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.Closeable;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Properties;
+import java.util.UUID;
+import java.io.RandomAccessFile;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+
+import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * NNStorage is responsible for management of the StorageDirectories used by
+ * the NameNode.
+ */
+@InterfaceAudience.Private
+public class NNStorage extends Storage implements Closeable {
+  private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
+
+  static final String MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
+
+  //
+  // The filenames used for storing the images
+  //
+  enum NameNodeFile {
+    IMAGE     ("fsimage"),
+    TIME      ("fstime"),
+    EDITS     ("edits"),
+    IMAGE_NEW ("fsimage.ckpt"),
+    EDITS_NEW ("edits.new");
+
+    private String fileName = null;
+    private NameNodeFile(String name) { this.fileName = name; }
+    String getName() { return fileName; }
+  }
+
+  /**
+   * Implementation of StorageDirType specific to namenode storage
+   * A Storage directory could be of type IMAGE which stores only fsimage,
+   * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which
+   * stores both fsimage and edits.
+   */
+  static enum NameNodeDirType implements StorageDirType {
+    UNDEFINED,
+    IMAGE,
+    EDITS,
+    IMAGE_AND_EDITS;
+
+    public StorageDirType getStorageDirType() {
+      return this;
+    }
+
+    public boolean isOfType(StorageDirType type) {
+      if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
+        return true;
+      return this == type;
+    }
+  }
+
+  /**
+   * Interface to be implemented by classes which make use of storage
+   * directories. They are  notified when a StorageDirectory is causing errors,
+   * becoming available or being formatted.
+   *
+   * This allows the implementors of the interface take their own specific
+   * action on the StorageDirectory when this occurs.
+   */
+  interface NNStorageListener {
+    /**
+     * An error has occurred with a StorageDirectory.
+     * @param sd The storage directory causing the error.
+     * @throws IOException
+     */
+    void errorOccurred(StorageDirectory sd) throws IOException;
+
+    /**
+     * A storage directory has been formatted.
+     * @param sd The storage directory being formatted.
+     * @throws IOException
+     */
+    void formatOccurred(StorageDirectory sd) throws IOException;
+
+    /**
+     * A storage directory is now available use.
+     * @param sd The storage directory which has become available.
+     * @throws IOException
+     */
+    void directoryAvailable(StorageDirectory sd) throws IOException;
+  }
+
+  final private List<NNStorageListener> listeners;
+  private UpgradeManager upgradeManager = null;
+  protected MD5Hash imageDigest = null;
+  protected String blockpoolID = ""; // id of the block pool
+
+  /**
+   * flag that controls if we try to restore failed storages
+   */
+  private boolean restoreFailedStorage = false;
+  private Object restorationLock = new Object();
+  private boolean disablePreUpgradableLayoutCheck = false;
+
+  private long checkpointTime = -1L;  // The age of the image
+
+  /**
+   * list of failed (and thus removed) storages
+   */
+  final protected List<StorageDirectory> removedStorageDirs
+    = new CopyOnWriteArrayList<StorageDirectory>();
+
+  /**
+   * Construct the NNStorage.
+   * @param conf Namenode configuration.
+   */
+  public NNStorage(Configuration conf) {
+    super(NodeType.NAME_NODE);
+
+    storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
+    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
+  }
+
+  /**
+   * Construct the NNStorage.
+   * @param storageInfo storage information
+   * @param bpid block pool Id
+   */
+  public NNStorage(StorageInfo storageInfo, String bpid) {
+    super(NodeType.NAME_NODE, storageInfo);
+
+    storageDirs = new CopyOnWriteArrayList<StorageDirectory>();
+    this.listeners = new CopyOnWriteArrayList<NNStorageListener>();
+    this.blockpoolID = bpid;
+  }
+
+  @Override // Storage
+  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
+    if (disablePreUpgradableLayoutCheck) {
+      return false;
+    }
+
+    File oldImageDir = new File(sd.getRoot(), "image");
+    if (!oldImageDir.exists()) {
+      return false;
+    }
+    // check the layout version inside the image file
+    File oldF = new File(oldImageDir, "fsimage");
+    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+    try {
+      oldFile.seek(0);
+      int odlVersion = oldFile.readInt();
+      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+        return false;
+    } finally {
+      oldFile.close();
+    }
+    return true;
+  }
+
+  @Override // Closeable
+  public void close() throws IOException {
+    listeners.clear();
+    unlockAll();
+    storageDirs.clear();
+  }
+
+  /**
+   * Set flag whether an attempt should be made to restore failed storage
+   * directories at the next available oppurtuinity.
+   *
+   * @param val Whether restoration attempt should be made.
+   */
+  void setRestoreFailedStorage(boolean val) {
+    LOG.warn("set restore failed storage to " + val);
+    restoreFailedStorage=val;
+  }
+
+  /**
+   * @return Whether failed storage directories are to be restored.
+   */
+  boolean getRestoreFailedStorage() {
+    return restoreFailedStorage;
+  }
+
+  /**
+   * See if any of removed storages is "writable" again, and can be returned
+   * into service. If saveNamespace is set, then this method is being
+   * called from saveNamespace.
+   *
+   * @param saveNamespace Whether method is being called from saveNamespace()
+   */
+  void attemptRestoreRemovedStorage(boolean saveNamespace) {
+    // if directory is "alive" - copy the images there...
+    if(!restoreFailedStorage || removedStorageDirs.size() == 0)
+      return; //nothing to restore
+
+    /* We don't want more than one thread trying to restore at a time */
+    synchronized (this.restorationLock) {
+      LOG.info("NNStorage.attemptRestoreRemovedStorage: check removed(failed) "+
+               "storarge. removedStorages size = " + removedStorageDirs.size());
+      for(Iterator<StorageDirectory> it
+            = this.removedStorageDirs.iterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        File root = sd.getRoot();
+        LOG.info("currently disabled dir " + root.getAbsolutePath() +
+                 "; type="+sd.getStorageDirType() 
+                 + ";canwrite="+root.canWrite());
+        try {
+          
+          if(root.exists() && root.canWrite()) {
+            /** If this call is being made from savenamespace command, then no
+             * need to format, the savenamespace command will format and write
+             * the new image to this directory anyways.
+             */
+            if (saveNamespace) {
+              sd.clearDirectory();
+            } else {
+              format(sd);
+            }
+            
+            LOG.info("restoring dir " + sd.getRoot().getAbsolutePath());
+            for (NNStorageListener listener : listeners) {
+              listener.directoryAvailable(sd);
+            }
+            
+            this.addStorageDir(sd); // restore
+            this.removedStorageDirs.remove(sd);
+          }
+        } catch(IOException e) {
+          LOG.warn("failed to restore " + sd.getRoot().getAbsolutePath(), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * @return A list of storage directories which are in the errored state.
+   */
+  List<StorageDirectory> getRemovedStorageDirs() {
+    return this.removedStorageDirs;
+  }
+
+  /**
+   * Set the storage directories which will be used. NNStorage.close() should
+   * be called before this to ensure any previous storage directories have been
+   * freed.
+   *
+   * Synchronized due to initialization of storageDirs and removedStorageDirs.
+   *
+   * @param fsNameDirs Locations to store images.
+   * @param fsEditsDirs Locations to store edit logs.
+   * @throws IOException
+   */
+  synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
+                                          Collection<URI> fsEditsDirs)
+      throws IOException {
+    this.storageDirs.clear();
+    this.removedStorageDirs.clear();
+
+   // Add all name dirs with appropriate NameNodeDirType
+    for (URI dirName : fsNameDirs) {
+      checkSchemeConsistency(dirName);
+      boolean isAlsoEdits = false;
+      for (URI editsDirName : fsEditsDirs) {
+        if (editsDirName.compareTo(dirName) == 0) {
+          isAlsoEdits = true;
+          fsEditsDirs.remove(editsDirName);
+          break;
+        }
+      }
+      NameNodeDirType dirType = (isAlsoEdits) ?
+                          NameNodeDirType.IMAGE_AND_EDITS :
+                          NameNodeDirType.IMAGE;
+      // Add to the list of storage directories, only if the
+      // URI is of type file://
+      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
+          == 0){
+        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
+            dirType));
+      }
+    }
+
+    // Add edits dirs if they are different from name dirs
+    for (URI dirName : fsEditsDirs) {
+      checkSchemeConsistency(dirName);
+      // Add to the list of storage directories, only if the
+      // URI is of type file://
+      if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
+          == 0)
+        this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
+                    NameNodeDirType.EDITS));
+    }
+  }
+
+  /**
+   * Checks the consistency of a URI, in particular if the scheme
+   * is specified and is supported by a concrete implementation
+   * @param u URI whose consistency is being checked.
+   */
+  private static void checkSchemeConsistency(URI u) throws IOException {
+    String scheme = u.getScheme();
+    // the URI should have a proper scheme
+    if(scheme == null)
+      throw new IOException("Undefined scheme for " + u);
+    else {
+      try {
+        // the scheme should be enumerated as JournalType
+        JournalType.valueOf(scheme.toUpperCase());
+      } catch (IllegalArgumentException iae){
+        throw new IOException("Unknown scheme " + scheme +
+            ". It should correspond to a JournalType enumeration value");
+      }
+    }
+  }
+
+  /**
+   * Retrieve current directories of type IMAGE
+   * @return Collection of URI representing image directories
+   * @throws IOException in case of URI processing error
+   */
+  Collection<URI> getImageDirectories() throws IOException {
+    return getDirectories(NameNodeDirType.IMAGE);
+  }
+
+  /**
+   * Retrieve current directories of type EDITS
+   * @return Collection of URI representing edits directories
+   * @throws IOException in case of URI processing error
+   */
+  Collection<URI> getEditsDirectories() throws IOException {
+    return getDirectories(NameNodeDirType.EDITS);
+  }
+
+  /**
+   * Return number of storage directories of the given type.
+   * @param dirType directory type
+   * @return number of storage directories of type dirType
+   */
+  int getNumStorageDirs(NameNodeDirType dirType) {
+    if(dirType == null)
+      return getNumStorageDirs();
+    Iterator<StorageDirectory> it = dirIterator(dirType);
+    int numDirs = 0;
+    for(; it.hasNext(); it.next())
+      numDirs++;
+    return numDirs;
+  }
+
+  /**
+   * Return the list of locations being used for a specific purpose.
+   * i.e. Image or edit log storage.
+   *
+   * @param dirType Purpose of locations requested.
+   * @throws IOException
+   */
+  Collection<URI> getDirectories(NameNodeDirType dirType)
+      throws IOException {
+    ArrayList<URI> list = new ArrayList<URI>();
+    Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
+                                    dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      try {
+        list.add(Util.fileAsURI(sd.getRoot()));
+      } catch (IOException e) {
+        throw new IOException("Exception while processing " +
+            "StorageDirectory " + sd.getRoot(), e);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Determine the checkpoint time of the specified StorageDirectory
+   *
+   * @param sd StorageDirectory to check
+   * @return If file exists and can be read, last checkpoint time. If not, 0L.
+   * @throws IOException On errors processing file pointed to by sd
+   */
+  long readCheckpointTime(StorageDirectory sd) throws IOException {
+    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
+    long timeStamp = 0L;
+    if (timeFile.exists() && timeFile.canRead()) {
+      DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
+      try {
+        timeStamp = in.readLong();
+      } finally {
+        in.close();
+      }
+    }
+    return timeStamp;
+  }
+
+  /**
+   * Write last checkpoint time into a separate file.
+   *
+   * @param sd
+   * @throws IOException
+   */
+  public void writeCheckpointTime(StorageDirectory sd) throws IOException {
+    if (checkpointTime < 0L)
+      return; // do not write negative time
+    File timeFile = getStorageFile(sd, NameNodeFile.TIME);
+    if (timeFile.exists() && ! timeFile.delete()) {
+        LOG.error("Cannot delete chekpoint time file: "
+                  + timeFile.getCanonicalPath());
+    }
+    FileOutputStream fos = new FileOutputStream(timeFile);
+    DataOutputStream out = new DataOutputStream(fos);
+    try {
+      out.writeLong(checkpointTime);
+      out.flush();
+      fos.getChannel().force(true);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Record new checkpoint time in order to
+   * distinguish healthy directories from the removed ones.
+   * If there is an error writing new checkpoint time, the corresponding
+   * storage directory is removed from the list.
+   */
+  public void incrementCheckpointTime() {
+    setCheckpointTimeInStorage(checkpointTime + 1);
+  }
+
+  /**
+   * The age of the namespace state.<p>
+   * Reflects the latest time the image was saved.
+   * Modified with every save or a checkpoint.
+   * Persisted in VERSION file.
+   *
+   * @return the current checkpoint time.
+   */
+  public long getCheckpointTime() {
+    return checkpointTime;
+  }
+
+  /**
+   * Set the checkpoint time.
+   *
+   * This method does not persist the checkpoint time to storage immediately.
+   * 
+   * @see #setCheckpointTimeInStorage
+   * @param newCpT the new checkpoint time.
+   */
+  public void setCheckpointTime(long newCpT) {
+    checkpointTime = newCpT;
+  }
+
+  /**
+   * Set the current checkpoint time. Writes the new checkpoint
+   * time to all available storage directories.
+   * @param newCpT The new checkpoint time.
+   */
+  public void setCheckpointTimeInStorage(long newCpT) {
+    checkpointTime = newCpT;
+    // Write new checkpoint time in all storage directories
+    for(Iterator<StorageDirectory> it =
+                          dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      try {
+        writeCheckpointTime(sd);
+      } catch(IOException e) {
+        // Close any edits stream associated with this dir and remove directory
+        LOG.warn("incrementCheckpointTime failed on "
+                 + sd.getRoot().getPath() + ";type="+sd.getStorageDirType());
+      }
+    }
+  }
+
+  /**
+   * Return the name of the image file that is uploaded by periodic
+   * checkpointing
+   *
+   * @return List of filenames to save checkpoints to.
+   */
+  public File[] getFsImageNameCheckpoint() {
+    ArrayList<File> list = new ArrayList<File>();
+    for (Iterator<StorageDirectory> it =
+                 dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      list.add(getStorageFile(it.next(), NameNodeFile.IMAGE_NEW));
+    }
+    return list.toArray(new File[list.size()]);
+  }
+
+  /**
+   * Return the name of the image file.
+   * @return The name of the first image file.
+   */
+  public File getFsImageName() {
+    StorageDirectory sd = null;
+    for (Iterator<StorageDirectory> it =
+      dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      sd = it.next();
+      if(sd.getRoot().canRead())
+        return getStorageFile(sd, NameNodeFile.IMAGE);
+    }
+    return null;
+  }
+
+  /**
+   * @return The name of the first editlog file.
+   */
+  public File getFsEditName() throws IOException {
+    for (Iterator<StorageDirectory> it
+           = dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      if(sd.getRoot().canRead())
+        return getEditFile(sd);
+    }
+    return null;
+  }
+
+  /**
+   * @return The name of the first time file.
+   */
+  public File getFsTimeName() {
+    StorageDirectory sd = null;
+    // NameNodeFile.TIME shoul be same on all directories
+    for (Iterator<StorageDirectory> it =
+             dirIterator(); it.hasNext();)
+      sd = it.next();
+    return getStorageFile(sd, NameNodeFile.TIME);
+  }
+
+  /** Create new dfs name directory.  Caution: this destroys all files
+   * in this filesystem. */
+  private void format(StorageDirectory sd) throws IOException {
+    sd.clearDirectory(); // create currrent dir
+    for (NNStorageListener listener : listeners) {
+      listener.formatOccurred(sd);
+    }
+    sd.write();
+
+    LOG.info("Storage directory " + sd.getRoot()
+             + " has been successfully formatted.");
+  }
+
+  /**
+   * Format all available storage directories.
+   */
+  public void format(String clusterId) throws IOException {
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.namespaceID = newNamespaceID();
+    this.clusterID = clusterId;
+    this.blockpoolID = newBlockPoolID();
+    this.cTime = 0L;
+    this.setCheckpointTime(now());
+    for (Iterator<StorageDirectory> it =
+                           dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      format(sd);
+    }
+  }
+
+  /**
+   * Generate new namespaceID.
+   *
+   * namespaceID is a persistent attribute of the namespace.
+   * It is generated when the namenode is formatted and remains the same
+   * during the life cycle of the namenode.
+   * When a datanodes register they receive it as the registrationID,
+   * which is checked every time the datanode is communicating with the
+   * namenode. Datanodes that do not 'know' the namespaceID are rejected.
+   *
+   * @return new namespaceID
+   */
+  private int newNamespaceID() {
+    Random r = new Random();
+    r.setSeed(now());
+    int newID = 0;
+    while(newID == 0)
+      newID = r.nextInt(0x7FFFFFFF);  // use 31 bits only
+    return newID;
+  }
+
+
+  /**
+   * Move {@code current} to {@code lastcheckpoint.tmp} and
+   * recreate empty {@code current}.
+   * {@code current} is moved only if it is well formatted,
+   * that is contains VERSION file.
+   *
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
+   */
+  protected void moveCurrent(StorageDirectory sd)
+    throws IOException {
+    File curDir = sd.getCurrentDir();
+    File tmpCkptDir = sd.getLastCheckpointTmp();
+    // mv current -> lastcheckpoint.tmp
+    // only if current is formatted - has VERSION file
+    if(sd.getVersionFile().exists()) {
+      assert curDir.exists() : curDir + " directory must exist.";
+      assert !tmpCkptDir.exists() : tmpCkptDir + " directory must not exist.";
+      rename(curDir, tmpCkptDir);
+    }
+    // recreate current
+    if(!curDir.exists() && !curDir.mkdir())
+      throw new IOException("Cannot create directory " + curDir);
+  }
+
+  /**
+   * Move {@code lastcheckpoint.tmp} to {@code previous.checkpoint}
+   *
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getPreviousCheckpoint()
+   * @see org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory#getLastCheckpointTmp()
+   */
+  protected void moveLastCheckpoint(StorageDirectory sd)
+    throws IOException {
+    File tmpCkptDir = sd.getLastCheckpointTmp();
+    File prevCkptDir = sd.getPreviousCheckpoint();
+    // remove previous.checkpoint
+    if (prevCkptDir.exists())
+      deleteDir(prevCkptDir);
+    // mv lastcheckpoint.tmp -> previous.checkpoint
+    if(tmpCkptDir.exists())
+      rename(tmpCkptDir, prevCkptDir);
+  }
+
+  @Override // Storage
+  protected void getFields(Properties props,
+                           StorageDirectory sd
+                           ) throws IOException {
+    super.getFields(props, sd);
+    if (layoutVersion == 0) {
+      throw new IOException("NameNode directory "
+                            + sd.getRoot() + " is not formatted.");
+    }
+
+    // No Block pool ID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      String sbpid = props.getProperty("blockpoolID");
+      setBlockPoolID(sd.getRoot(), sbpid);
+    }
+    
+    String sDUS, sDUV;
+    sDUS = props.getProperty("distributedUpgradeState");
+    sDUV = props.getProperty("distributedUpgradeVersion");
+    setDistributedUpgradeState(
+        sDUS == null? false : Boolean.parseBoolean(sDUS),
+        sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
+
+    String sMd5 = props.getProperty(MESSAGE_DIGEST_PROPERTY);
+    if (layoutVersion <= -26) {
+      if (sMd5 == null) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "file " + STORAGE_FILE_VERSION
+            + " does not have MD5 image digest.");
+      }
+      this.imageDigest = new MD5Hash(sMd5);
+    } else if (sMd5 != null) {
+      throw new InconsistentFSStateException(sd.getRoot(),
+          "file " + STORAGE_FILE_VERSION +
+          " has image MD5 digest when version is " + layoutVersion);
+    }
+
+    this.setCheckpointTime(readCheckpointTime(sd));
+  }
+
+  /**
+   * Write last checkpoint time and version file into the storage directory.
+   *
+   * The version file should always be written last.
+   * Missing or corrupted version file indicates that
+   * the checkpoint is not valid.
+   *
+   * @param sd storage directory
+   * @throws IOException
+   */
+  @Override // Storage
+  protected void setFields(Properties props,
+                           StorageDirectory sd
+                           ) throws IOException {
+    super.setFields(props, sd);
+    // Set blockpoolID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion < LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      props.setProperty("blockpoolID", blockpoolID);
+    }
+    boolean uState = getDistributedUpgradeState();
+    int uVersion = getDistributedUpgradeVersion();
+    if(uState && uVersion != getLayoutVersion()) {
+      props.setProperty("distributedUpgradeState", Boolean.toString(uState));
+      props.setProperty("distributedUpgradeVersion",
+                        Integer.toString(uVersion));
+    }
+    if (imageDigest == null) {
+      imageDigest = MD5Hash.digest(
+          new FileInputStream(getStorageFile(sd, NameNodeFile.IMAGE)));
+    }
+
+    props.setProperty(MESSAGE_DIGEST_PROPERTY, imageDigest.toString());
+
+    writeCheckpointTime(sd);
+  }
+
+  /**
+   * @return A File of 'type' in storage directory 'sd'.
+   */
+  static File getStorageFile(StorageDirectory sd, NameNodeFile type) {
+    return new File(sd.getCurrentDir(), type.getName());
+  }
+
+  /**
+   * @return A editlog File in storage directory 'sd'.
+   */
+  File getEditFile(StorageDirectory sd) {
+    return getStorageFile(sd, NameNodeFile.EDITS);
+  }
+
+  /**
+   * @return A temporary editlog File in storage directory 'sd'.
+   */
+  File getEditNewFile(StorageDirectory sd) {
+    return getStorageFile(sd, NameNodeFile.EDITS_NEW);
+  }
+
+  /**
+   * @return A list of all Files of 'type' in available storage directories.
+   */
+  Collection<File> getFiles(NameNodeFile type, NameNodeDirType dirType) {
+    ArrayList<File> list = new ArrayList<File>();
+    Iterator<StorageDirectory> it =
+      (dirType == null) ? dirIterator() : dirIterator(dirType);
+    for ( ;it.hasNext(); ) {
+      list.add(getStorageFile(it.next(), type));
+    }
+    return list;
+  }
+
+  /**
+   * Set the upgrade manager for use in a distributed upgrade.
+   * @param um The upgrade manager
+   */
+  void setUpgradeManager(UpgradeManager um) {
+    upgradeManager = um;
+  }
+
+  /**
+   * @return The current distribued upgrade state.
+   */
+  boolean getDistributedUpgradeState() {
+    return upgradeManager == null ? false : upgradeManager.getUpgradeState();
+  }
+
+  /**
+   * @return The current upgrade version.
+   */
+  int getDistributedUpgradeVersion() {
+    return upgradeManager == null ? 0 : upgradeManager.getUpgradeVersion();
+  }
+
+  /**
+   * Set the upgrade state and version.
+   * @param uState the new state.
+   * @param uVersion the new version.
+   */
+  private void setDistributedUpgradeState(boolean uState, int uVersion) {
+    upgradeManager.setUpgradeState(uState, uVersion);
+  }
+
+  /**
+   * Verify that the distributed upgrade state is valid.
+   * @param startOpt the option the namenode was started with.
+   */
+  void verifyDistributedUpgradeProgress(StartupOption startOpt
+                                        ) throws IOException {
+    if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
+      return;
+
+    assert upgradeManager != null : "FSNameSystem.upgradeManager is null.";
+    if(startOpt != StartupOption.UPGRADE) {
+      if(upgradeManager.getUpgradeState())
+        throw new IOException(
+                    "\n   Previous distributed upgrade was not completed. "
+                  + "\n   Please restart NameNode with -upgrade option.");
+      if(upgradeManager.getDistributedUpgrades() != null)
+        throw new IOException("\n   Distributed upgrade for NameNode version "
+                              + upgradeManager.getUpgradeVersion()
+                              + " to current LV " + FSConstants.LAYOUT_VERSION
+                              + " is required.\n   Please restart NameNode"
+                              + " with -upgrade option.");
+    }
+  }
+
+  /**
+   * Initialize a distributed upgrade.
+   */
+  void initializeDistributedUpgrade() throws IOException {
+    if(! upgradeManager.initializeUpgrade())
+      return;
+    // write new upgrade state into disk
+    writeAll();
+    LOG.info("\n   Distributed upgrade for NameNode version "
+             + upgradeManager.getUpgradeVersion() + " to current LV "
+             + FSConstants.LAYOUT_VERSION + " is initialized.");
+  }
+
+  /**
+   * Set the digest for the latest image stored by NNStorage.
+   * @param digest The digest for the image.
+   */
+  void setImageDigest(MD5Hash digest) {
+    this.imageDigest = digest;
+  }
+
+  /**
+   * Get the digest for the latest image storage by NNStorage.
+   * @return The digest for the latest image.
+   */
+  MD5Hash getImageDigest() {
+    return imageDigest;
+  }
+
+  /**
+   * Register a listener. The listener will be notified of changes to the list
+   * of available storage directories.
+   *
+   * @see NNStorageListener
+   * @param sel A storage listener.
+   */
+  void registerListener(NNStorageListener sel) {
+    listeners.add(sel);
+  }
+
+  /**
+   * Disable the check for pre-upgradable layouts. Needed for BackupImage.
+   * @param val Whether to disable the preupgradeable layout check.
+   */
+  void setDisablePreUpgradableLayoutCheck(boolean val) {
+    disablePreUpgradableLayoutCheck = val;
+  }
+
+  /**
+   * Marks a list of directories as having experienced an error.
+   *
+   * @param sds A list of storage directories to mark as errored.
+   * @throws IOException
+   */
+  void reportErrorsOnDirectories(List<StorageDirectory> sds) throws IOException {
+    for (StorageDirectory sd : sds) {
+      reportErrorsOnDirectory(sd);
+    }
+  }
+
+  /**
+   * Reports that a directory has experienced an error.
+   * Notifies listeners that the directory is no longer
+   * available.
+   *
+   * @param sd A storage directory to mark as errored.
+   * @throws IOException
+   */
+  void reportErrorsOnDirectory(StorageDirectory sd)
+      throws IOException {
+    LOG.warn("Error reported on storage directory " + sd);
+
+    String lsd = listStorageDirectories();
+    LOG.debug("current list of storage dirs:" + lsd);
+
+    for (NNStorageListener listener : listeners) {
+      listener.errorOccurred(sd);
+    }
+
+    LOG.info("About to remove corresponding storage: "
+             + sd.getRoot().getAbsolutePath());
+    try {
+      sd.unlock();
+    } catch (Exception e) {
+      LOG.info("Unable to unlock bad storage directory: "
+               +  sd.getRoot().getPath(), e);
+    }
+
+    if (this.storageDirs.remove(sd)) {
+      this.removedStorageDirs.add(sd);
+    }
+    incrementCheckpointTime();
+
+    lsd = listStorageDirectories();
+    LOG.debug("at the end current list of storage dirs:" + lsd);
+  }
+  
+  /**
+   * Generate new clusterID.
+   * 
+   * clusterID is a persistent attribute of the cluster.
+   * It is generated when the cluster is created and remains the same
+   * during the life cycle of the cluster.  When a new name node is formated, if 
+   * this is a new cluster, a new clusterID is geneated and stored.  Subsequent 
+   * name node must be given the same ClusterID during its format to be in the 
+   * same cluster.
+   * When a datanode register it receive the clusterID and stick with it.
+   * If at any point, name node or data node tries to join another cluster, it 
+   * will be rejected.
+   * 
+   * @return new clusterID
+   */ 
+  public static String newClusterID() {
+    return "CID-" + UUID.randomUUID().toString();
+  }
+
+  void setClusterID(String cid) {
+    clusterID = cid;
+  }
+
+  /**
+   * try to find current cluster id in the VERSION files
+   * returns first cluster id found in any VERSION file
+   * null in case none found
+   * @return clusterId or null in case no cluster id found
+   */
+  public String determineClusterId() {
+    String cid = null;
+    Iterator<StorageDirectory> sdit = dirIterator(NameNodeDirType.IMAGE);
+    while(sdit.hasNext()) {
+      StorageDirectory sd = sdit.next();
+      try {
+        Properties props = sd.readFrom(sd.getVersionFile());
+        cid = props.getProperty("clusterID");
+        LOG.info("current cluster id for sd="+sd.getCurrentDir() + 
+            ";lv=" + layoutVersion + ";cid=" + cid);
+        
+        if(cid != null && !cid.equals(""))
+          return cid;
+      } catch (Exception e) {
+        LOG.warn("this sd not available: " + e.getLocalizedMessage());
+      } //ignore
+    }
+    LOG.warn("couldn't find any VERSION file containing valid ClusterId");
+    return null;
+  }
+
+  /**
+   * Generate new blockpoolID.
+   * 
+   * @return new blockpoolID
+   */ 
+  String newBlockPoolID() throws UnknownHostException{
+    String ip = "unknownIP";
+    try {
+      ip = DNS.getDefaultIP("default");
+    } catch (UnknownHostException e) {
+      LOG.warn("Could not find ip address of \"default\" inteface.");
+      throw e;
+    }
+    
+    int rand = 0;
+    try {
+      rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
+    } catch (NoSuchAlgorithmException e) {
+      final Random R = new Random();
+      LOG.warn("Could not use SecureRandom");
+      rand = R.nextInt(Integer.MAX_VALUE);
+    }
+    String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
+    return bpid;
+  }
+
+  /** Validate and set block pool ID */
+  void setBlockPoolID(String bpid) {
+    blockpoolID = bpid;
+  }
+
+  /** Validate and set block pool ID */
+  private void setBlockPoolID(File storage, String bpid)
+      throws InconsistentFSStateException {
+    if (bpid == null || bpid.equals("")) {
+      throw new InconsistentFSStateException(storage, "file "
+          + Storage.STORAGE_FILE_VERSION + " has no block pool Id.");
+    }
+    
+    if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
+      throw new InconsistentFSStateException(storage,
+          "Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
+    }
+    blockpoolID = bpid;
+  }
+  
+  public String getBlockPoolID() {
+    return blockpoolID;
+  }
+}

+ 6 - 6
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -357,7 +357,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
     nodeRegistration = new NamenodeRegistration(
         getHostPortString(rpcAddress),
         getHostPortString(httpAddress),
-        getFSImage(), getRole(), getFSImage().getCheckpointTime());
+        getFSImage().getStorage(), getRole(), getFSImage().getStorage().getCheckpointTime());
     return nodeRegistration;
   }
 
@@ -1361,7 +1361,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
    * Returns the name of the fsImage file
    */
   public File getFsImageName() throws IOException {
-    return getFSImage().getFsImageName();
+    return getFSImage().getStorage().getFsImageName();
   }
     
   public FSImage getFSImage() {
@@ -1373,7 +1373,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
    * checkpointing
    */
   public File[] getFsImageNameCheckpoint() throws IOException {
-    return getFSImage().getFsImageNameCheckpoint();
+    return getFSImage().getStorage().getFsImageNameCheckpoint();
   }
 
   /**
@@ -1453,7 +1453,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
     String clusterId = StartupOption.FORMAT.getClusterId();
     if(clusterId == null || clusterId.equals("")) {
       // try to get one from the existing storage
-      clusterId = fsImage.determineClusterId();
+      clusterId = fsImage.getStorage().determineClusterId();
       if (clusterId == null || clusterId.equals("")) {
         throw new IllegalArgumentException("Format must be provided with clusterid");
       }
@@ -1465,7 +1465,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
         while(System.in.read() != '\n'); // discard the enter-key
       }
     }
-    nsys.dir.fsImage.format(clusterId);
+    nsys.dir.fsImage.getStorage().format(clusterId);
     return false;
   }
 
@@ -1600,7 +1600,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
         return null; // avoid javac warning
       case GENCLUSTERID:
         System.err.println("Generating new cluster id:");
-        System.out.println(FSImage.newClusterID());
+        System.out.println(NNStorage.newClusterID());
         System.exit(0);
         return null;
       case FINALIZE:

+ 4 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

@@ -180,8 +180,8 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       FSNamesystem fsn = nn.getNamesystem();
       FSImage fsImage = fsn.getFSImage();
-      List<Storage.StorageDirectory> removedStorageDirs = fsImage
-          .getRemovedStorageDirs();
+      List<Storage.StorageDirectory> removedStorageDirs 
+        = fsImage.getStorage().getRemovedStorageDirs();
 
       // FS Image storage configuration
       out.print("<h3> " + nn.getRole() + " Storage: </h3>");
@@ -189,7 +189,8 @@ class NamenodeJspHelper {
               + "<thead><tr><td><b>Storage Directory</b></td><td><b>Type</b></td><td><b>State</b></td></tr></thead>");
 
       StorageDirectory st = null;
-      for (Iterator<StorageDirectory> it = fsImage.dirIterator(); it.hasNext();) {
+      for (Iterator<StorageDirectory> it 
+             = fsImage.getStorage().dirIterator(); it.hasNext();) {
         st = it.next();
         String dir = "" + st.getRoot();
         String type = "" + st.getStorageDirType();

+ 37 - 28
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -40,8 +40,10 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
@@ -313,7 +315,7 @@ public class SecondaryNameNode implements Runnable {
         LOG.error("Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
         e.printStackTrace();
-        checkpointImage.imageDigest = null;
+        checkpointImage.getStorage().imageDigest = null;
       } catch (Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ");
         LOG.error(StringUtils.stringifyException(e));
@@ -337,32 +339,34 @@ public class SecondaryNameNode implements Runnable {
   
           @Override
           public Boolean run() throws Exception {
-            checkpointImage.cTime = sig.cTime;
-            checkpointImage.checkpointTime = sig.checkpointTime;
-                    
+            checkpointImage.getStorage().cTime = sig.cTime;
+            checkpointImage.getStorage().setCheckpointTime(sig.checkpointTime);
+
             // get fsimage
             String fileid;
             Collection<File> list;
             File[] srcNames;
             boolean downloadImage = true;
-            if (sig.imageDigest.equals(checkpointImage.imageDigest)) {
+            if (sig.imageDigest.equals(
+                    checkpointImage.getStorage().imageDigest)) {
               downloadImage = false;
               LOG.info("Image has not changed. Will not download image.");
             } else {
               fileid = "getimage=1";
-              list = checkpointImage.getFiles(NameNodeFile.IMAGE,
-                  NameNodeDirType.IMAGE);
+              list = checkpointImage.getStorage().getFiles(
+                  NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
               srcNames = list.toArray(new File[list.size()]);
               assert srcNames.length > 0 : "No checkpoint targets.";
               TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-              checkpointImage.imageDigest = sig.imageDigest;
+              checkpointImage.getStorage().imageDigest = sig.imageDigest;
               LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
                   srcNames[0].length() + " bytes.");
             }
         
             // get edits file
             fileid = "getedit=1";
-            list = getFSImage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
+            list = getFSImage().getStorage().getFiles(
+                NameNodeFile.EDITS, NameNodeDirType.EDITS);
             srcNames = list.toArray(new File[list.size()]);;
             assert srcNames.length > 0 : "No checkpoint targets.";
             TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
@@ -390,7 +394,7 @@ public class SecondaryNameNode implements Runnable {
     String fileid = "putimage=1&port=" + imagePort +
       "&machine=" + infoBindAddress + 
       "&token=" + sig.toString() +
-      "&newChecksum=" + checkpointImage.imageDigest;
+      "&newChecksum=" + checkpointImage.getStorage().getImageDigest();
     LOG.info("Posted URL " + fsName + fileid);
     TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
   }
@@ -409,7 +413,7 @@ public class SecondaryNameNode implements Runnable {
     if (sockAddr.getAddress().isAnyLocalAddress()) {
       if(UserGroupInformation.isSecurityEnabled()) {
         throw new IOException("Cannot use a wildcard address with security. " +
-        		"Must explicitly set bind address for Kerberos");
+                              "Must explicitly set bind address for Kerberos");
       }
       return fsName.getHost() + ":" + sockAddr.getPort();
     } else {
@@ -458,13 +462,13 @@ public class SecondaryNameNode implements Runnable {
     checkpointImage.endCheckpoint();
 
     LOG.warn("Checkpoint done. New Image Size: " 
-              + checkpointImage.getFsImageName().length());
+             + checkpointImage.getStorage().getFsImageName().length());
     
     return loadImage;
   }
 
   private void startCheckpoint() throws IOException {
-    checkpointImage.unlockAll();
+    checkpointImage.getStorage().unlockAll();
     checkpointImage.getEditLog().close();
     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
     checkpointImage.startCheckpoint();
@@ -622,10 +626,10 @@ public class SecondaryNameNode implements Runnable {
                        Collection<URI> editsDirs) throws IOException {
       Collection<URI> tempDataDirs = new ArrayList<URI>(dataDirs);
       Collection<URI> tempEditsDirs = new ArrayList<URI>(editsDirs);
-      this.storageDirs = new ArrayList<StorageDirectory>();
-      setStorageDirectories(tempDataDirs, tempEditsDirs);
+      storage.close();
+      storage.setStorageDirectories(tempDataDirs, tempEditsDirs);
       for (Iterator<StorageDirectory> it = 
-                   dirIterator(); it.hasNext();) {
+                   storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
         boolean isAccessible = true;
         try { // create directories if don't exist yet
@@ -669,14 +673,18 @@ public class SecondaryNameNode implements Runnable {
      * @throws IOException
      */
     void startCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveCurrent(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveCurrent(sd);
       }
     }
 
     void endCheckpoint() throws IOException {
-      for(StorageDirectory sd : storageDirs) {
-        moveLastCheckpoint(sd);
+      for (Iterator<StorageDirectory> it
+             = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        storage.moveLastCheckpoint(sd);
       }
     }
 
@@ -690,25 +698,26 @@ public class SecondaryNameNode implements Runnable {
       StorageDirectory sdEdits = null;
       Iterator<StorageDirectory> it = null;
       if (loadImage) {
-        it = dirIterator(NameNodeDirType.IMAGE);
+        it = getStorage().dirIterator(NameNodeDirType.IMAGE);
         if (it.hasNext())
           sdName = it.next();
         if (sdName == null) {
           throw new IOException("Could not locate checkpoint fsimage");
         }
       }
-      it = dirIterator(NameNodeDirType.EDITS);
+      it = getStorage().dirIterator(NameNodeDirType.EDITS);
       if (it.hasNext())
         sdEdits = it.next();
       if (sdEdits == null)
         throw new IOException("Could not locate checkpoint edits");
       if (loadImage) {
-        this.layoutVersion = -1; // to avoid assert in loadFSImage()
-        loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
+        // to avoid assert in loadFSImage()
+        this.getStorage().layoutVersion = -1;
+        loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
       }
       loadFSEdits(sdEdits);
-      clusterID = sig.getClusterID();
-      blockpoolID = sig.getBlockpoolID();
+      storage.setClusterID(sig.getClusterID());
+      storage.setBlockPoolID(sig.getBlockpoolID());
       sig.validateStorageInfo(this);
       saveNamespace(false);
     }

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/UpgradeManagerNamenode.java

@@ -60,7 +60,7 @@ class UpgradeManagerNamenode extends UpgradeManager {
       initializeUpgrade();
       if(!upgradeState) return false;
       // write new upgrade state into disk
-      namesystem.getFSImage().writeAll();
+      namesystem.getFSImage().getStorage().writeAll();
     }
     assert currentUpgrades != null : "currentUpgrades is null";
     this.broadcastCommand = currentUpgrades.first().startUpgrade();
@@ -111,7 +111,7 @@ class UpgradeManagerNamenode extends UpgradeManager {
   public synchronized void completeUpgrade() throws IOException {
     // set and write new upgrade state into disk
     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
-    namesystem.getFSImage().writeAll();
+    namesystem.getFSImage().getStorage().writeAll();
     currentUpgrades = null;
     broadcastCommand = null;
     namesystem.leaveSafeMode(false);
@@ -125,7 +125,7 @@ class UpgradeManagerNamenode extends UpgradeManager {
       isFinalized = fsimage.isUpgradeFinalized();
       if(isFinalized) // upgrade is finalized
         return null;  // nothing to report
-      return new UpgradeStatusReport(fsimage.getLayoutVersion(), 
+      return new UpgradeStatusReport(fsimage.getStorage().getLayoutVersion(),
                                      (short)101, isFinalized);
     }
     UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java

@@ -267,7 +267,7 @@ public class TestDFSRollback extends TestCase {
           UpgradeUtilities.getCurrentClusterID(null),
           UpgradeUtilities.getCurrentFsscTime(null));
       
-      UpgradeUtilities.createNameNodeVersionFile(baseDirs,
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
           storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java

@@ -255,7 +255,7 @@ public class TestDFSUpgrade extends TestCase {
           UpgradeUtilities.getCurrentClusterID(null),
           UpgradeUtilities.getCurrentFsscTime(null));
       
-      UpgradeUtilities.createNameNodeVersionFile(baseDirs, storageInfo,
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
           UpgradeUtilities.getCurrentBlockPoolID(cluster));
       
       startNameNodeShouldFail(StartupOption.UPGRADE);
@@ -268,7 +268,7 @@ public class TestDFSUpgrade extends TestCase {
           UpgradeUtilities.getCurrentClusterID(null),
           UpgradeUtilities.getCurrentFsscTime(null));
       
-      UpgradeUtilities.createNameNodeVersionFile(baseDirs, storageInfo,
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
           UpgradeUtilities.getCurrentBlockPoolID(cluster));
       
       startNameNodeShouldFail(StartupOption.UPGRADE);

+ 6 - 5
src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -372,14 +372,15 @@ public class UpgradeUtilities {
    *
    * @return the created version file
    */
-  public static File[] createNameNodeVersionFile(File[] parent,
-      StorageInfo version, String bpid) throws IOException {
-    FSImage storage = null;
+  public static File[] createNameNodeVersionFile(Configuration conf,
+      File[] parent, StorageInfo version, String bpid) throws IOException {
+    Storage storage = null;
     File[] versionFiles = new File[parent.length];
     for (int i = 0; i < parent.length; i++) {
       File versionFile = new File(parent[i], "VERSION");
       FileUtil.fullyDelete(versionFile);
-      storage = new FSImage(version, bpid);
+      storage = new NNStorage(conf);
+      storage.setStorageInfo(version);
       StorageDirectory sd = storage.new StorageDirectory(parent[i].getParentFile());
       sd.write(versionFile);
       versionFiles[i] = versionFile;

+ 2 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -72,8 +71,8 @@ public class TestDataNodeMultipleRegistrations {
       String cid2 = nn2.getFSImage().getClusterID();
       int lv1 = nn1.getFSImage().getLayoutVersion();
       int lv2 = nn2.getFSImage().getLayoutVersion();
-      int ns1 = nn1.getFSImage().namespaceID;
-      int ns2 = nn2.getFSImage().namespaceID;
+      int ns1 = nn1.getFSImage().getNamespaceID();
+      int ns2 = nn2.getFSImage().getNamespaceID();
       assertNotSame("namespace ids should be different", ns1, ns2);
       LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1 + ";uri="
           + nn1.getNameNodeAddress());

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java

@@ -193,7 +193,7 @@ public class CreateEditsLog {
     FileNameGenerator nameGenerator = new FileNameGenerator(BASE_PATH, 100);
 
     FSEditLog editLog = fsImage.getEditLog();
-    editLog.createEditLogFile(fsImage.getFsEditName());
+    editLog.createEditLogFile(fsImage.getStorage().getFsEditName());
     editLog.open();
     addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
              nameGenerator);

+ 5 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java

@@ -33,7 +33,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -83,9 +84,10 @@ public class OfflineEditsViewerHelper {
   private String getEditsFilename() throws IOException {
     FSImage image = cluster.getNameNode().getFSImage();
     // it was set up to only have ONE StorageDirectory
-    Iterator<StorageDirectory> it = image.dirIterator(NameNodeDirType.EDITS);
+    Iterator<StorageDirectory> it
+      = image.getStorage().dirIterator(NameNodeDirType.EDITS);
     StorageDirectory sd = it.next();
-    return image.getEditFile(sd).getAbsolutePath();
+    return image.getStorage().getEditFile(sd).getAbsolutePath();
   }
 
   /**

+ 25 - 24
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -33,11 +33,11 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -188,15 +188,15 @@ public class TestCheckpoint extends TestCase {
     // and that temporary checkpoint files are gone.
     FSImage image = cluster.getNameNode().getFSImage();
     for (Iterator<StorageDirectory> it = 
-             image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+           image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
       StorageDirectory sd = it.next();
-      assertFalse(FSImage.getImageFile(sd, NameNodeFile.IMAGE_NEW).exists());
+      assertFalse(image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE_NEW).exists());
     }
     for (Iterator<StorageDirectory> it = 
-            image.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+           image.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
       StorageDirectory sd = it.next();
-      assertFalse(image.getEditNewFile(sd).exists());
-      File edits = image.getEditFile(sd);
+      assertFalse(image.getStorage().getEditNewFile(sd).exists());
+      File edits = image.getStorage().getEditFile(sd);
       assertTrue(edits.exists()); // edits should exist and be empty
       long editsLen = edits.length();
       assertTrue(editsLen == Integer.SIZE/Byte.SIZE);
@@ -364,10 +364,10 @@ public class TestCheckpoint extends TestCase {
       assertTrue(!fileSys.exists(file1));
       StorageDirectory sd = null;
       for (Iterator<StorageDirectory> it = 
-                image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
+                image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
          sd = it.next();
       assertTrue(sd != null);
-      long fsimageLength = FSImage.getImageFile(sd, NameNodeFile.IMAGE).length();
+      long fsimageLength = image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE).length();
       //
       // Make the checkpoint
       //
@@ -385,8 +385,8 @@ public class TestCheckpoint extends TestCase {
 
       // Verify that image file sizes did not change.
       for (Iterator<StorageDirectory> it = 
-              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue(FSImage.getImageFile(it.next(), 
+              image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue(image.getStorage().getStorageFile(it.next(), 
                                 NameNodeFile.IMAGE).length() == fsimageLength);
       }
 
@@ -479,7 +479,7 @@ public class TestCheckpoint extends TestCase {
     SecondaryNameNode secondary = null;
     try {
       secondary = startSecondaryNameNode(conf);
-      assertFalse(secondary.getFSImage().isLockSupported(0));
+      assertFalse(secondary.getFSImage().getStorage().isLockSupported(0));
       secondary.shutdown();
     } catch (IOException e) { // expected to fail
       assertTrue(secondary == null);
@@ -504,7 +504,7 @@ public class TestCheckpoint extends TestCase {
     try {
       nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
                           StartupOption.REGULAR);
-      assertFalse(nn.getFSImage().isLockSupported(0));
+      assertFalse(nn.getFSImage().getStorage().isLockSupported(0));
       nn.stop(); nn = null;
     } catch (IOException e) { // expected to fail
       assertTrue(nn == null);
@@ -518,7 +518,7 @@ public class TestCheckpoint extends TestCase {
     SecondaryNameNode secondary2 = null;
     try {
       secondary2 = startSecondaryNameNode(conf);
-      assertFalse(secondary2.getFSImage().isLockSupported(0));
+      assertFalse(secondary2.getFSImage().getStorage().isLockSupported(0));
       secondary2.shutdown();
     } catch (IOException e) { // expected to fail
       assertTrue(secondary2 == null);
@@ -566,8 +566,8 @@ public class TestCheckpoint extends TestCase {
     // Verify that image file sizes did not change.
     FSImage image = nn.getFSImage();
     for (Iterator<StorageDirectory> it = 
-            image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-      assertTrue(FSImage.getImageFile(it.next(), 
+            image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+      assertTrue(image.getStorage().getStorageFile(it.next(), 
                           NameNodeFile.IMAGE).length() == fsimageLength);
     }
     nn.stop();
@@ -894,15 +894,15 @@ public class TestCheckpoint extends TestCase {
       // Make the checkpoint
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      long fsimageLength = FSImage.getImageFile(
-          image.dirIterator(NameNodeDirType.IMAGE).next(),
-          NameNodeFile.IMAGE).length();
+      long fsimageLength = image.getStorage()
+        .getStorageFile(image.getStorage().dirIterator(NameNodeDirType.IMAGE).next(),
+                        NameNodeFile.IMAGE).length();
       assertFalse("Image is downloaded", secondary.doCheckpoint());
 
       // Verify that image file sizes did not change.
       for (Iterator<StorageDirectory> it = 
-              image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size does not change", FSImage.getImageFile(it.next(), 
+             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size does not change", image.getStorage().getStorageFile(it.next(), 
                                 NameNodeFile.IMAGE).length() == fsimageLength);
       }
 
@@ -911,9 +911,10 @@ public class TestCheckpoint extends TestCase {
       assertTrue("Image is not downloaded", secondary.doCheckpoint());
 
       for (Iterator<StorageDirectory> it = 
-        image.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-        assertTrue("Image size increased", FSImage.getImageFile(it.next(), 
-                          NameNodeFile.IMAGE).length() > fsimageLength);
+             image.getStorage().dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+        assertTrue("Image size increased", 
+                   image.getStorage().getStorageFile(it.next(), 
+                                                     NameNodeFile.IMAGE).length() > fsimageLength);
      }
 
       secondary.shutdown();

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,7 +91,8 @@ public class TestClusterId {
     Collection<URI> editsToFormat = new ArrayList<URI>(0);
     FSImage fsImage = new FSImage(dirsToFormat, editsToFormat);
     
-    Iterator<StorageDirectory> sdit = fsImage.dirIterator(NameNodeDirType.IMAGE);
+    Iterator<StorageDirectory> sdit = 
+      fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);
     StorageDirectory sd = sdit.next();
     Properties props = sd.readFrom(sd.getVersionFile());
     String cid = props.getProperty("clusterID");

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 
 /**
  * This class tests the creation and validation of a checkpoint.
@@ -140,8 +140,8 @@ public class TestEditLog extends TestCase {
       //
       FSEditLogLoader loader = new FSEditLogLoader(namesystem);
       for (Iterator<StorageDirectory> it = 
-              fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+              fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

@@ -38,8 +38,8 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.junit.Assert.*;
@@ -219,8 +219,8 @@ public class TestEditLogRace {
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.
     for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+           fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
       System.out.println("Verifying file: " + editFile);
       int numEdits = new FSEditLogLoader(namesystem).loadFSEdits(
         new EditLogFileInputStream(editFile));

+ 4 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 
 import java.util.Iterator;
 import java.util.List;
@@ -108,11 +110,11 @@ public class TestParallelImageWrite extends TestCase {
   
   private void checkImages(FSNamesystem fsn) throws Exception {
     Iterator<StorageDirectory> iter = fsn.
-            getFSImage().dirIterator(FSImage.NameNodeDirType.IMAGE);
+            getFSImage().getStorage().dirIterator(NameNodeDirType.IMAGE);
     List<Long> checksums = new ArrayList<Long>();
     while (iter.hasNext()) {
       StorageDirectory sd = iter.next();
-      File fsImage = FSImage.getImageFile(sd, FSImage.NameNodeFile.IMAGE);
+      File fsImage = fsn.getFSImage().getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
       PureJavaCrc32 crc = new PureJavaCrc32();
       FileInputStream in = new FileInputStream(fsImage);
       byte[] buff = new byte[4096];

+ 27 - 13
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -104,11 +104,17 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     FSImage originalImage = fsn.dir.fsImage;
+    NNStorage storage = originalImage.getStorage();
+    storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
+
+    NNStorage spyStorage = spy(storage);
+    originalImage.storage = spyStorage;
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
+    
+    spyImage.getStorage().setStorageDirectories(FSNamesystem.getNamespaceDirs(conf), 
+                                                FSNamesystem.getNamespaceEditsDirs(conf));
 
     // inject fault
     switch(fault) {
@@ -120,12 +126,12 @@ public class TestSaveNamespace {
     case MOVE_CURRENT:
       // The spy throws a RuntimeException when calling moveCurrent()
       doThrow(new RuntimeException("Injected fault: moveCurrent")).
-        when(spyImage).moveCurrent((StorageDirectory)anyObject());
+        when(spyStorage).moveCurrent((StorageDirectory)anyObject());
       break;
     case MOVE_LAST_CHECKPOINT:
       // The spy throws a RuntimeException when calling moveLastCheckpoint()
       doThrow(new RuntimeException("Injected fault: moveLastCheckpoint")).
-        when(spyImage).moveLastCheckpoint((StorageDirectory)anyObject());
+        when(spyStorage).moveLastCheckpoint((StorageDirectory)anyObject());
       break;
     }
 
@@ -175,12 +181,18 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     FSImage originalImage = fsn.dir.fsImage;
+    NNStorage storage = originalImage.getStorage();
+    storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
+
+    NNStorage spyStorage = spy(storage);
+    originalImage.storage = spyStorage;
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
-        FSNamesystem.getNamespaceDirs(conf), 
-        FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;
 
+    spyImage.getStorage().setStorageDirectories(FSNamesystem.getNamespaceDirs(conf), 
+                                                FSNamesystem.getNamespaceEditsDirs(conf));
+
     // inject fault
     // The spy throws a IOException when writing to the second directory
     doAnswer(new FaultySaveImage(false)).
@@ -196,9 +208,9 @@ public class TestSaveNamespace {
       fsn.saveNamespace();
       LOG.warn("First savenamespace sucessful.");
       assertTrue("Savenamespace should have marked one directory as bad." +
-                 " But found " + spyImage.getRemovedStorageDirs().size() +
+                 " But found " + spyStorage.getRemovedStorageDirs().size() +
                  " bad directories.", 
-                   spyImage.getRemovedStorageDirs().size() == 1);
+                   spyStorage.getRemovedStorageDirs().size() == 1);
 
       // The next call to savenamespace should try inserting the
       // erroneous directory back to fs.name.dir. This command should
@@ -208,9 +220,9 @@ public class TestSaveNamespace {
       LOG.warn("Second savenamespace sucessful.");
       assertTrue("Savenamespace should have been successful in removing " +
                  " bad directories from Image."  +
-                 " But found " + originalImage.getRemovedStorageDirs().size() +
+                 " But found " + storage.getRemovedStorageDirs().size() +
                  " bad directories.", 
-                 originalImage.getRemovedStorageDirs().size() == 0);
+                 storage.getRemovedStorageDirs().size() == 0);
 
       // Now shut down and restart the namesystem
       LOG.info("Shutting down fsimage.");
@@ -258,8 +270,10 @@ public class TestSaveNamespace {
 
     // Replace the FSImage with a spy
     final FSImage originalImage = fsn.dir.fsImage;
+    originalImage.getStorage().close();
+
     FSImage spyImage = spy(originalImage);
-    spyImage.setStorageDirectories(
+    spyImage.getStorage().setStorageDirectories(
         FSNamesystem.getNamespaceDirs(conf), 
         FSNamesystem.getNamespaceEditsDirs(conf));
     fsn.dir.fsImage = spyImage;

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java

@@ -29,8 +29,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -138,8 +138,8 @@ public class TestSecurityTokenEditLog extends TestCase {
       namesystem.getDelegationTokenSecretManager().stopThreads();
       int numKeys = namesystem.getDelegationTokenSecretManager().getNumberOfKeys();
       for (Iterator<StorageDirectory> it = 
-              fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-        File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+             fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editFile = fsimage.getStorage().getStorageFile(it.next(), NameNodeFile.EDITS);
         System.out.println("Verifying file: " + editFile);
         int numEdits = loader.loadFSEdits(
                                   new EditLogFileInputStream(editFile));

+ 11 - 11
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java

@@ -48,8 +48,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.StringUtils;
@@ -226,15 +226,15 @@ public class TestStartup extends TestCase {
    */
   private void verifyDifferentDirs(FSImage img, long expectedImgSize, long expectedEditsSize) {
     StorageDirectory sd =null;
-    for (Iterator<StorageDirectory> it = img.dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = img.getStorage().dirIterator(); it.hasNext();) {
       sd = it.next();
 
       if(sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
+        File imf = img.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
         LOG.info("--image file " + imf.getAbsolutePath() + "; len = " + imf.length() + "; expected = " + expectedImgSize);
         assertEquals(expectedImgSize, imf.length());	
       } else if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+        File edf = img.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
         LOG.info("-- edits file " + edf.getAbsolutePath() + "; len = " + edf.length()  + "; expected = " + expectedEditsSize);
         assertEquals(expectedEditsSize, edf.length());	
       } else {
@@ -337,10 +337,10 @@ public class TestStartup extends TestCase {
 
       // now verify that image and edits are created in the different directories
       FSImage image = nn.getFSImage();
-      StorageDirectory sd = image.getStorageDir(0); //only one
+      StorageDirectory sd = image.getStorage().getStorageDir(0); //only one
       assertEquals(sd.getStorageDirType(), NameNodeDirType.IMAGE_AND_EDITS);
-      File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
-      File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+      File imf = image.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
+      File edf = image.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
       LOG.info("--image file " + imf.getAbsolutePath() + "; len = " + imf.length());
       LOG.info("--edits file " + edf.getAbsolutePath() + "; len = " + edf.length());
 
@@ -445,7 +445,7 @@ public class TestStartup extends TestCase {
     FSImage image = namenode.getFSImage();
     image.loadFSImage();
 
-    File versionFile = image.getStorageDir(0).getVersionFile();
+    File versionFile = image.getStorage().getStorageDir(0).getVersionFile();
 
     RandomAccessFile file = new RandomAccessFile(versionFile, "rws");
     FileInputStream in = null;
@@ -458,12 +458,12 @@ public class TestStartup extends TestCase {
       props.load(in);
 
       // get the MD5 property and change it
-      String sMd5 = props.getProperty(FSImage.MESSAGE_DIGEST_PROPERTY);
+      String sMd5 = props.getProperty(NNStorage.MESSAGE_DIGEST_PROPERTY);
       MD5Hash md5 = new MD5Hash(sMd5);
       byte[] bytes = md5.getDigest();
       bytes[0] += 1;
       md5 = new MD5Hash(bytes);
-      props.setProperty(FSImage.MESSAGE_DIGEST_PROPERTY, md5.toString());
+      props.setProperty(NNStorage.MESSAGE_DIGEST_PROPERTY, md5.toString());
 
       // write the properties back to version file
       file.seek(0);

+ 11 - 11
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

@@ -45,8 +45,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
@@ -128,7 +128,7 @@ public class TestStorageRestore extends TestCase {
    */
   public void invalidateStorage(FSImage fi) throws IOException {
     ArrayList<StorageDirectory> al = new ArrayList<StorageDirectory>(2);
-    Iterator<StorageDirectory> it = fi.dirIterator();
+    Iterator<StorageDirectory> it = fi.getStorage().dirIterator();
     while(it.hasNext()) {
       StorageDirectory sd = it.next();
       if(sd.getRoot().equals(path2) || sd.getRoot().equals(path3)) {
@@ -136,7 +136,7 @@ public class TestStorageRestore extends TestCase {
       }
     }
     // simulate an error
-    fi.processIOError(al, true);
+    fi.getStorage().reportErrorsOnDirectories(al);
   }
   
   /**
@@ -144,15 +144,15 @@ public class TestStorageRestore extends TestCase {
    */
   public void printStorages(FSImage fs) {
     LOG.info("current storages and corresoponding sizes:");
-    for(Iterator<StorageDirectory> it = fs.dirIterator(); it.hasNext(); ) {
+    for(Iterator<StorageDirectory> it = fs.getStorage().dirIterator(); it.hasNext(); ) {
       StorageDirectory sd = it.next();
       
       if(sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
-        File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
+        File imf = fs.getStorage().getStorageFile(sd, NameNodeFile.IMAGE);
         LOG.info("  image file " + imf.getAbsolutePath() + "; len = " + imf.length());  
       }
       if(sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+        File edf = fs.getStorage().getStorageFile(sd, NameNodeFile.EDITS);
         LOG.info("  edits file " + edf.getAbsolutePath() + "; len = " + edf.length()); 
       }
     }
@@ -342,7 +342,7 @@ public class TestStorageRestore extends TestCase {
       FSImage fsi = cluster.getNameNode().getFSImage();
 
       // it is started with dfs.name.dir.restore set to true (in SetUp())
-      boolean restore = fsi.getRestoreFailedStorage();
+      boolean restore = fsi.getStorage().getRestoreFailedStorage();
       LOG.info("Restore is " + restore);
       assertEquals(restore, true);
 
@@ -355,19 +355,19 @@ public class TestStorageRestore extends TestCase {
           new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
           namenode);
       executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertFalse("After set true call restore is " + restore, restore);
 
       // run one more time - to set it to true again
       cmd = "-fs NAMENODE -restoreFailedStorage true";
       executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertTrue("After set false call restore is " + restore, restore);
       
    // run one more time - no change in value
       cmd = "-fs NAMENODE -restoreFailedStorage check";
       CommandExecutor.Result cmdResult = executor.executeCommand(cmd);
-      restore = fsi.getRestoreFailedStorage();
+      restore = fsi.getStorage().getRestoreFailedStorage();
       assertTrue("After check call restore is " + restore, restore);
       String commandOutput = cmdResult.getCommandOutput();
       commandOutput.trim();

Some files were not shown because too many files changed in this diff