소스 검색

HDFS-3075. Merging r1300680 and other dependencies HDFS-2701:r1221098, HDFS-2703:r1221099, HDFS-2702:r1221100

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1300791 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 13 년 전
부모
커밋
17bf44c02b

+ 10 - 0
CHANGES.txt

@@ -1,5 +1,8 @@
 Hadoop Change Log
 
+    HDFS-3075. Backport HADOOP-4885: Try to restore failed name-node storage
+    directories at checkpoint time.  (Brandon Li via szetszwo)
+
 Release 1.0.2 - unreleased
 
   NEW FEATURES
@@ -7,6 +10,8 @@ Release 1.0.2 - unreleased
     HADOOP-7206. Support Snappy compression. (Issei Yoshida and
     Alejandro Abdelnur via vinodkv).
 
+    HDFS-2701. Cleanup FS* processIOError methods. (eli)
+
     HDFS-2978. The NameNode should expose name dir statuses via JMX. (atm)
 
   IMPROVEMENTS
@@ -36,6 +41,11 @@ Release 1.0.2 - unreleased
     MAPREDUCE-764. Fix TypedBytesInput.readRaw to preserve custom type codes.  
     (Klaas Bosteels via acmurthy) 
 
+    HDFS-2703. removedStorageDirs is not updated everywhere we remove
+    a storage dir. (eli)
+
+    HDFS-2702. A single failed name dir can cause the NN to exit. (eli)
+
 Release 1.0.1 - 2012.02.14
 
   NEW FEATURES

+ 3 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -69,6 +69,9 @@ class FSDirectory implements FSConstants, Closeable {
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
         Integer.MAX_VALUE, -1);
     this.fsImage = fsImage;
+    fsImage.setRestoreRemovedDirs(conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT));
     namesystem = ns;
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);

+ 119 - 93
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -304,11 +304,13 @@ public class FSEditLog {
   }
   
   private int getNumStorageDirs() {
- int numStorageDirs = 0;
- for (Iterator<StorageDirectory> it = 
-       fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
-   numStorageDirs++;
-    return numStorageDirs;
+   int numStorageDirs = 0;
+   Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+   while (it.hasNext()) {
+     numStorageDirs++;
+     it.next();
+   }
+   return numStorageDirs;
   }
   
   synchronized int getNumEditStreams() {
@@ -327,21 +329,22 @@ public class FSEditLog {
    */
   public synchronized void open() throws IOException {
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
-    if (editStreams == null)
+    if (editStreams == null) {
       editStreams = new ArrayList<EditLogOutputStream>();
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    }
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); 
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       File eFile = getEditFile(sd);
       try {
         EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
         editStreams.add(eStream);
-      } catch (IOException e) {
-        FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
-        // Remove the directory from list of storage directories
+      } catch (IOException ioe) {
+        fsimage.updateRemovedDirs(sd, ioe);
         it.remove();
       }
     }
+    exitIfNoStreams();
   }
 
   public synchronized void createEditLogFile(File name) throws IOException {
@@ -372,84 +375,94 @@ public class FSEditLog {
         eStream.setReadyToFlush();
         eStream.flush();
         eStream.close();
-      } catch (IOException e) {
-        processIOError(idx);
+      } catch (IOException ioe) {
+        removeEditsAndStorageDir(idx);
         idx--;
       }
     }
     editStreams.clear();
   }
 
+  void fatalExit(String msg) {
+    FSNamesystem.LOG.fatal(msg, new Exception(msg));
+    Runtime.getRuntime().exit(-1);
+  }
+
   /**
-   * If there is an IO Error on any log operations, remove that
-   * directory from the list of directories.
-   * If no more directories remain, then exit.
+   * Exit the NN process if the edit streams have not yet been
+   * initialized, eg we failed while opening.
    */
-  synchronized void processIOError(int index) {
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
+  private void exitIfStreamsNotSet() {
+    if (editStreams == null) {
+      fatalExit("Edit streams not yet initialized");
     }
-    assert(index < getNumStorageDirs());
-    assert(getNumStorageDirs() == editStreams.size());
+  }
+
+  /**
+   * Exit the NN process if there are no edit streams to log to.
+   */
+  void exitIfNoStreams() {
+    if (editStreams == null || editStreams.isEmpty()) {
+      fatalExit("No edit streams are accessible");
+    }
+  }
+
+  /**
+   * @return the storage directory for the given edit stream. 
+   */
+  private File getStorageDirForStream(int idx) {
+    File editsFile =
+      ((EditLogFileOutputStream)editStreams.get(idx)).getFile();
+    // Namedir is the parent of current which is the parent of edits
+    return editsFile.getParentFile().getParentFile();
+  }
+
+  /**
+   * Remove the given edits stream and its containing storage dir.
+   */
+  synchronized void removeEditsAndStorageDir(int idx) {
+    exitIfStreamsNotSet();
+
+    assert idx < getNumStorageDirs();
+    assert getNumStorageDirs() == editStreams.size();
     
-    File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                      .get(index)).getFile()
-                                      .getParentFile().getParentFile();
-    editStreams.remove(index);
-    //
-    // Invoke the ioerror routine of the fsimage
-    //
-    fsimage.processIOError(parentStorageDir);
+    File dir = getStorageDirForStream(idx);
+    editStreams.remove(idx);
+    fsimage.removeStorageDir(dir);
   }
-  
+
   /**
-   * If there is an IO Error on any log operations on storage directory,
-   * remove any stream associated with that directory 
+   * Remove all edits streams for the given storage directory.
    */
-  synchronized void processIOError(StorageDirectory sd) {
-    // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+  synchronized void removeEditsForStorageDir(StorageDirectory sd) {
+    exitIfStreamsNotSet();
+
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
       return;
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-          "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
     }
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                       .get(idx)).getFile()
-                                       .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+      File parentDir = getStorageDirForStream(idx);
+      if (parentDir.getName().equals(sd.getRoot().getName())) {
         editStreams.remove(idx);
- }
+      }
+    }
   }
   
   /**
-   * The specified streams have IO errors. Remove them from logging
-   * new transactions.
+   * Remove each of the given edits streams and their corresponding
+   * storage directories.
    */
-  private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
+  private void removeEditsStreamsAndStorageDirs(
+      ArrayList<EditLogOutputStream> errorStreams) {
     if (errorStreams == null) {
-      return;                       // nothing to do
+      return;
     }
-    for (int idx = 0; idx < errorStreams.size(); idx++) {
-      EditLogOutputStream eStream = errorStreams.get(idx);
-      int j = 0;
-      int numEditStreams = editStreams.size();
-      for (j = 0; j < numEditStreams; j++) {
-        if (editStreams.get(j) == eStream) {
-          break;
-        }
+    for (EditLogOutputStream errorStream : errorStreams) {
+      int idx = editStreams.indexOf(errorStream);
+      if (-1 == idx) {
+        fatalExit("Unable to find edits stream with IO error");
       }
-      if (j == numEditStreams) {
-          FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured. " +
-                                 "Fatal Error.");
-          Runtime.getRuntime().exit(-1);
-      }
-      processIOError(j);
+      removeEditsAndStorageDir(idx);
     }
     fsimage.incrementCheckpointTime();
   }
@@ -458,8 +471,8 @@ public class FSEditLog {
    * check if ANY edits.new log exists
    */
   boolean existsNew() throws IOException {
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    while (it.hasNext()) {
       if (getEditNewFile(it.next()).exists()) { 
         return true;
       }
@@ -903,19 +916,20 @@ public class FSEditLog {
    * store yet.
    */
   synchronized void logEdit(byte op, Writable ... writables) {
-    assert this.getNumEditStreams() > 0 : "no editlog streams";
+    if (getNumEditStreams() < 1) {
+      throw new AssertionError("No edit streams to log to");
+    }
     long start = FSNamesystem.now();
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
         eStream.write(op, writables);
-      } catch (IOException ie) {
-        processIOError(idx);         
-        // processIOError will remove the idx's stream 
-        // from the editStreams collection, so we need to update idx
+      } catch (IOException ioe) {
+        removeEditsAndStorageDir(idx);
         idx--; 
       }
     }
+    exitIfNoStreams();
     // get a new transactionId
     txid++;
 
@@ -983,7 +997,7 @@ public class FSEditLog {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
         eStream.flush();
-      } catch (IOException ie) {
+      } catch (IOException ioe) {
         //
         // remember the streams that encountered an error.
         //
@@ -991,14 +1005,14 @@ public class FSEditLog {
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         }
         errorStreams.add(eStream);
-        FSNamesystem.LOG.error("Unable to sync edit log. " +
-                               "Fatal Error.");
+        FSNamesystem.LOG.error("Unable to sync "+eStream.getName());
       }
     }
     long elapsed = FSNamesystem.now() - start;
 
     synchronized (this) {
-       processIOError(errorStreams);
+       removeEditsStreamsAndStorageDirs(errorStreams);
+       exitIfNoStreams();
        synctxid = syncStart;
        isSyncRunning = false;
        this.notifyAll();
@@ -1217,36 +1231,46 @@ public class FSEditLog {
     // exists in all directories.
     //
     if (existsNew()) {
-      for (Iterator<StorageDirectory> it = 
-               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      Iterator<StorageDirectory> it =
+        fsimage.dirIterator(NameNodeDirType.EDITS);
+      StringBuilder b = new StringBuilder();
+      while (it.hasNext()) {
         File editsNew = getEditNewFile(it.next());
-     if (!editsNew.exists()) { 
-          throw new IOException("Inconsistent existance of edits.new " +
-                                editsNew);
+        b.append("\n  ").append(editsNew);
+        if (!editsNew.exists()) {
+          throw new IOException(
+              "Inconsistent existence of edits.new " + editsNew);
         }
       }
-      return; // nothing to do, edits.new exists!
+      FSNamesystem.LOG.warn("Cannot roll edit log," +
+          " edits.new files already exists in all healthy directories:" + b);
+      return;
     }
 
-    close();                     // close existing edit log
+    close(); // close existing edit log
 
+    // After edit streams are closed, healthy edits files should be identical,
+    // and same to fsimage files
+    fsimage.restoreStorageDirs();
+    
     //
     // Open edits.new
     //
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       try {
         EditLogFileOutputStream eStream = 
              new EditLogFileOutputStream(getEditNewFile(sd));
         eStream.create();
         editStreams.add(eStream);
-      } catch (IOException e) {
-        // remove stream and this storage directory from list
-        processIOError(sd);
-       it.remove();
+      } catch (IOException ioe) {
+        removeEditsForStorageDir(sd);
+        fsimage.updateRemovedDirs(sd, ioe);
+        it.remove();
       }
     }
+    exitIfNoStreams();
   }
 
   /**
@@ -1266,8 +1290,8 @@ public class FSEditLog {
     //
     // Delete edits and rename edits.new to edits.
     //
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
         //
@@ -1276,8 +1300,10 @@ public class FSEditLog {
         //
         getEditFile(sd).delete();
         if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
-          // Should we also remove from edits
-          it.remove(); 
+          sd.unlock();
+          removeEditsForStorageDir(sd);
+          fsimage.updateRemovedDirs(sd, null);
+          it.remove();
         }
       }
     }

+ 158 - 65
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -28,39 +28,41 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
-import java.util.Map;
-import java.util.HashMap;
-import java.lang.Math;
-import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -116,9 +118,10 @@ public class FSImage extends Storage {
   private boolean isUpgradeFinalized = false;
   
   /**
-   * list of failed (and thus removed) storages
+   * List of failed (and thus removed) storages
    */
-  protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>();
+  private List<StorageDirectory> removedStorageDirs 
+    = new ArrayList<StorageDirectory>();
   
   /**
    * Directories for importing an image from a checkpoint.
@@ -137,6 +140,9 @@ public class FSImage extends Storage {
   static private final FsPermission FILE_PERM = new FsPermission((short)0);
   static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
 
+  /** Flag to restore removed storage directories at checkpointing */
+  private boolean restoreRemovedDirs = DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT;
+
   /**
    */
   FSImage() {
@@ -171,9 +177,9 @@ public class FSImage extends Storage {
   void setStorageDirectories(Collection<File> fsNameDirs,
                         Collection<File> fsEditsDirs
                              ) throws IOException {
-    this.storageDirs = new ArrayList<StorageDirectory>();
-    this.removedStorageDirs = new ArrayList<StorageDirectory>();
-   // Add all name dirs with appropriate NameNodeDirType 
+    storageDirs = new ArrayList<StorageDirectory>();
+    removedStorageDirs = new ArrayList<StorageDirectory>();
+    // Add all name dirs with appropriate NameNodeDirType 
     for (File dirName : fsNameDirs) {
       boolean isAlsoEdits = false;
       for (File editsDirName : fsEditsDirs) {
@@ -186,13 +192,12 @@ public class FSImage extends Storage {
       NameNodeDirType dirType = (isAlsoEdits) ?
                           NameNodeDirType.IMAGE_AND_EDITS :
                           NameNodeDirType.IMAGE;
-      this.addStorageDir(new StorageDirectory(dirName, dirType));
+      addStorageDir(new StorageDirectory(dirName, dirType));
     }
     
     // Add edits dirs if they are different from name dirs
     for (File dirName : fsEditsDirs) {
-      this.addStorageDir(new StorageDirectory(dirName, 
-                    NameNodeDirType.EDITS));
+      addStorageDir(new StorageDirectory(dirName, NameNodeDirType.EDITS)); 
     }
   }
 
@@ -207,9 +212,14 @@ public class FSImage extends Storage {
   }
   
   List<StorageDirectory> getRemovedStorageDirs() {
-	  return this.removedStorageDirs;
+	  return removedStorageDirs;
   }
-  
+
+  void updateRemovedDirs(StorageDirectory sd, IOException ioe) {
+    LOG.warn("Removing storage dir " + sd.getRoot().getPath(), ioe);
+    removedStorageDirs.add(sd);
+  }
+
   File getEditFile(StorageDirectory sd) {
     return getImageFile(sd, NameNodeFile.EDITS);
   }
@@ -604,45 +614,37 @@ public class FSImage extends Storage {
   }
 
   /**
-   * Record new checkpoint time in order to
+   * Record new checkpoint time in each storage dir in order to
    * distinguish healthy directories from the removed ones.
    * If there is an error writing new checkpoint time, the corresponding
    * storage directory is removed from the list.
    */
   void incrementCheckpointTime() {
     this.checkpointTime++;
-    
-    // Write new checkpoint time in all storage directories
-    for(Iterator<StorageDirectory> it =
-                          dirIterator(); it.hasNext();) {
+
+    Iterator<StorageDirectory> it = dirIterator();
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       try {
         writeCheckpointTime(sd);
-      } catch(IOException e) {
-        // Close any edits stream associated with this dir and remove directory
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-          editLog.processIOError(sd);
-        }
-
-        //add storage to the removed list
-        removedStorageDirs.add(sd);
+      } catch (IOException ioe) {
+        editLog.removeEditsForStorageDir(sd);
+        updateRemovedDirs(sd, ioe);
         it.remove();
       }
     }
+    editLog.exitIfNoStreams();
   }
   
   /**
-   * Remove storage directory given directory
+   * Remove the given storage directory.
    */
-  
-  void processIOError(File dirName) {
-    for (Iterator<StorageDirectory> it = 
-      dirIterator(); it.hasNext();) {
+  void removeStorageDir(File dir) {
+    Iterator<StorageDirectory> it = dirIterator(); 
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
-      if (sd.getRoot().getPath().equals(dirName.getPath())) {
-        //add storage to the removed list
-        LOG.info(" removing " + dirName.getPath());
-        removedStorageDirs.add(sd);
+      if (sd.getRoot().getPath().equals(dir.getPath())) {
+        updateRemovedDirs(sd, null);
         it.remove();
       }
     }
@@ -652,6 +654,11 @@ public class FSImage extends Storage {
     return editLog;
   }
 
+  /** Testing hook */
+  public void setEditLog(FSEditLog newLog) {
+    editLog = newLog;
+  }
+
   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
     File oldImageDir = new File(sd.getRoot(), "image");
     if (!oldImageDir.exists()) {
@@ -1080,7 +1087,7 @@ public class FSImage extends Storage {
         moveCurrent(sd);
       } catch(IOException ie) {
         LOG.error("Unable to move current for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
     }
 
@@ -1092,7 +1099,7 @@ public class FSImage extends Storage {
         saveCurrent(sd);
       } catch(IOException ie) {
         LOG.error("Unable to save image for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
     }
 
@@ -1114,7 +1121,7 @@ public class FSImage extends Storage {
         saveCurrent(sd);
       } catch(IOException ie) {
         LOG.error("Unable to save edits for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
     }
     // mv lastcheckpoint.tmp -> previous.checkpoint
@@ -1124,7 +1131,7 @@ public class FSImage extends Storage {
         moveLastCheckpoint(sd);
       } catch(IOException ie) {
         LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
     }
     if(!editLog.isOpen()) editLog.open();
@@ -1211,7 +1218,99 @@ public class FSImage extends Storage {
       newID = r.nextInt(0x7FFFFFFF);  // use 31 bits only
     return newID;
   }
+  
+  void setRestoreRemovedDirs(boolean allow) {
+    this.restoreRemovedDirs = allow;
+  }  
+  
+  /** restore a metadata file */
+  private static void restoreFile(File src, File dstdir, String dstfile)
+      throws IOException {
+    File dst = new File(dstdir, dstfile);
+    IOUtils.copyBytes(new FileInputStream(src), new FileOutputStream(dst),
+        DFSConfigKeys.DFS_STREAM_BUFFER_SIZE_DEFAULT, true);
+  }
+
+  /** 
+   * Refresh storage dirs by copying files from good storage dir
+   */
+  void restoreStorageDirs() throws IOException {
+    if (!restoreRemovedDirs || getRemovedStorageDirs().isEmpty()) {
+      return;
+    }
+    
+    Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
+    if (!it.hasNext()) {
+      throw new IOException("No healthy edits directory");
+    }
+    StorageDirectory goodSd = it.next();
+    File goodEdits = getEditFile(goodSd);
 
+    it = dirIterator(NameNodeDirType.IMAGE);
+    if (!it.hasNext()) {
+      throw new IOException("No healthy fsimage directory");
+    }
+    goodSd = it.next();
+    File goodImage = getImageFile(goodSd, NameNodeFile.IMAGE);
+    File goodFstime = getImageFile(goodSd, NameNodeFile.TIME);
+    File goodVersion = goodSd.getVersionFile();
+    //for Hadoop version < 0.13 to fail to start
+    File goodImage013 = new File(goodSd.getRoot(), "image/fsimage");
+
+    List<IOException> exceptions = new ArrayList<IOException>();
+    for (Iterator<StorageDirectory> i = removedStorageDirs.iterator();
+        i.hasNext();) {
+      StorageDirectory sd = i.next();
+      FSNamesystem.LOG.info("Try to recover removed directory " + sd.getRoot()
+          + " by reformatting");
+      try {
+        // don't create dir if it doesn't exist, since it may should be mounted
+        if (!sd.getRoot().exists()) {
+          throw new IOException("Directory " + sd.getRoot() + "doesn't exist"); 
+        }
+        if (!FileUtil.fullyDeleteContents(sd.getRoot())) {
+          throw new IOException("Can't fully delete content of " + sd.getRoot());
+        }
+        sd.clearDirectory(); // create empty "current" dir
+        restoreFile(goodVersion, sd.getCurrentDir(), Storage.STORAGE_FILE_VERSION);
+        restoreFile(goodFstime, sd.getCurrentDir(), NameNodeFile.TIME.getName());
+
+        // Create image directory
+        File imageDir = new File(sd.getRoot(), "image");
+        if (!imageDir.mkdir()) {
+          throw new IOException("Can't make directory 'image'.");
+        }
+        restoreFile(goodImage013, imageDir, NameNodeFile.IMAGE.getName());
+
+        if (sd.getStorageDirType().equals(NameNodeDirType.EDITS)) {
+          restoreFile(goodEdits, sd.getCurrentDir(), NameNodeFile.EDITS.getName());
+        } else if (sd.getStorageDirType().equals(NameNodeDirType.IMAGE)) {
+          restoreFile(goodImage, sd.getCurrentDir(), NameNodeFile.IMAGE.getName());
+        } else if (sd.getStorageDirType().equals(
+            NameNodeDirType.IMAGE_AND_EDITS)) {
+          restoreFile(goodEdits, sd.getCurrentDir(), NameNodeFile.EDITS.getName());
+          restoreFile(goodImage, sd.getCurrentDir(), NameNodeFile.IMAGE.getName());
+        } else {
+          throw new IOException("Invalid NameNodeDirType: "
+              + sd.getStorageDirType());
+        }
+        
+        //remove from removedStorageDirs and add back to healthy. 
+        i.remove();
+        addStorageDir(new StorageDirectory(sd.getRoot(), sd.getStorageDirType()));
+      } catch (IOException e) {
+        FSNamesystem.LOG.warn("Failed to recover removed directory "
+            + sd.getRoot() + " with " + e);
+        exceptions.add(e);
+      }
+    }
+    
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
+    }
+  }
+  
+  
   /** Create new dfs name directory.  Caution: this destroys all files
    * in this filesystem. */
   void format(StorageDirectory sd) throws IOException {
@@ -1433,8 +1532,8 @@ public class FSImage extends Storage {
     if (!editLog.existsNew()) {
       throw new IOException("New Edits file does not exist");
     }
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       if (!ckpt.exists()) {
@@ -1447,8 +1546,8 @@ public class FSImage extends Storage {
     //
     // Renames new image
     //
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    it = dirIterator(NameNodeDirType.IMAGE);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
@@ -1457,15 +1556,13 @@ public class FSImage extends Storage {
       if (!ckpt.renameTo(curFile)) {
         curFile.delete();
         if (!ckpt.renameTo(curFile)) {
-          // Close edit stream, if this directory is also used for edits
-          if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-            editLog.processIOError(sd);
-        // add storage to the removed list
-          removedStorageDirs.add(sd);
+          editLog.removeEditsForStorageDir(sd);
+          updateRemovedDirs(sd, null);
           it.remove();
         }
       }
     }
+    editLog.exitIfNoStreams();
 
     //
     // Updates the fstime file on all directories (fsimage and edits)
@@ -1473,8 +1570,8 @@ public class FSImage extends Storage {
     //
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.checkpointTime = FSNamesystem.now();
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
+    it = dirIterator();
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       // delete old edits if sd is the image only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
@@ -1488,13 +1585,9 @@ public class FSImage extends Storage {
       }
       try {
         sd.write();
-      } catch (IOException e) {
-        LOG.error("Cannot write file " + sd.getRoot(), e);
-        // Close edit stream, if this directory is also used for edits
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-          editLog.processIOError(sd);
-      //add storage to the removed list
-        removedStorageDirs.add(sd);
+      } catch (IOException ioe) {
+        editLog.removeEditsForStorageDir(sd);
+        updateRemovedDirs(sd, ioe);
         it.remove();
       }
     }

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -426,7 +426,7 @@ public class SecondaryNameNode implements Runnable {
     namenode.rollFsImage();
     checkpointImage.endCheckpoint();
 
-    LOG.warn("Checkpoint done. New Image Size: " 
+    LOG.info("Checkpoint done. New Image Size: "
               + checkpointImage.getFsImageName().length());
   }
 

+ 201 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageDirectoryFailure.java

@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import static org.junit.Assert.*;
+
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * Test that the NN stays up as long as it has a valid storage directory and
+ * exits when there are no more valid storage directories.
+ */
+public class TestStorageDirectoryFailure {
+
+  MiniDFSCluster cluster = null;
+  FileSystem fs;
+  SecondaryNameNode secondaryNN;
+  ArrayList<String> nameDirs;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+
+    String baseDir = System.getProperty("test.build.data", "/tmp");
+    File dfsDir = new File(baseDir, "dfs");
+    nameDirs = new ArrayList<String>();
+    nameDirs.add(new File(dfsDir, "name1").getPath());
+    nameDirs.add(new File(dfsDir, "name2").getPath());
+    nameDirs.add(new File(dfsDir, "name3").getPath());
+
+    conf.set("dfs.name.dir", StringUtils.join(nameDirs, ","));
+    conf.set("dfs.data.dir", new File(dfsDir, "data").getPath());
+    conf.set("fs.checkpoint.dir", new File(dfsDir, "secondary").getPath());
+    conf.set("fs.default.name", "hdfs://localhost:0");
+    conf.set("dfs.http.address", "0.0.0.0:0");
+    conf.set("dfs.secondary.http.address", "0.0.0.0:0");
+    cluster = new MiniDFSCluster(0, conf, 1, true, false, true, null, null,
+        null, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    secondaryNN = new SecondaryNameNode(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    if (secondaryNN != null) {
+      secondaryNN.shutdown();
+    }
+  }
+
+  private List<StorageDirectory> getRemovedDirs() {
+    return cluster.getNameNode().getFSImage().getRemovedStorageDirs();
+  }
+
+  private int numRemovedDirs() {
+    return getRemovedDirs().size();
+  }
+
+  private void writeFile(String name, byte[] buff) throws IOException {
+    FSDataOutputStream writeStream = fs.create(new Path(name));
+    writeStream.write(buff, 0, buff.length);
+    writeStream.close();
+  }
+
+  private byte[] readFile(String name, int len) throws IOException {
+    FSDataInputStream readStream = fs.open(new Path(name));
+    byte[] buff = new byte[len];
+    readStream.readFully(buff);
+    readStream.close();
+    return buff;
+  }
+
+  /** Assert that we can create and read a file */
+  private void checkFileCreation(String name) throws IOException {
+    byte[] buff = "some bytes".getBytes();
+    writeFile(name, buff);
+    assertTrue(Arrays.equals(buff, readFile(name, buff.length)));
+  }
+
+  /** Assert that we can read a file we created */
+  private void checkFileContents(String name) throws IOException {
+    byte[] buff = "some bytes".getBytes();
+    assertTrue(Arrays.equals(buff, readFile(name, buff.length)));
+  }
+
+  @Test
+  /** Remove storage dirs and checkpoint to trigger detection */
+  public void testCheckpointAfterFailingFirstNamedir() throws IOException {
+    assertEquals(0, numRemovedDirs());
+
+    checkFileCreation("file0");
+
+    // Remove the 1st storage dir
+    FileUtil.fullyDelete(new File(nameDirs.get(0)));
+    secondaryNN.doCheckpoint();
+    assertEquals(1, numRemovedDirs());
+    assertEquals(nameDirs.get(0), getRemovedDirs().get(0).getRoot().getPath());
+
+    checkFileCreation("file1");
+
+    // Remove the 2nd
+    FileUtil.fullyDelete(new File(nameDirs.get(1)));
+    secondaryNN.doCheckpoint();
+    assertEquals(2, numRemovedDirs());
+    assertEquals(nameDirs.get(1), getRemovedDirs().get(1).getRoot().getPath());
+
+    checkFileCreation("file2");
+
+    // Remove the last one. Prevent the NN from exiting the process when
+    // it notices this via the checkpoint.
+    FSEditLog spyLog = spy(cluster.getNameNode().getFSImage().getEditLog());
+    doNothing().when(spyLog).fatalExit(anyString());
+    cluster.getNameNode().getFSImage().setEditLog(spyLog);
+
+    // After the checkpoint, we should be dead. Verify fatalExit was
+    // called and that eg a checkpoint fails.
+    FileUtil.fullyDelete(new File(nameDirs.get(2)));
+    try {
+      secondaryNN.doCheckpoint();
+      fail("There's no storage to retrieve an image from");
+    } catch (FileNotFoundException fnf) {
+      // Expected
+    }
+    verify(spyLog, atLeastOnce()).fatalExit(anyString());
+
+    // Check that we can't mutate state without any edit streams
+    try {
+      checkFileCreation("file3");
+      fail("Created a file w/o edit streams");
+    } catch (IOException ioe) {
+      // Expected
+      assertTrue(ioe.getMessage().contains(
+          "java.lang.AssertionError: No edit streams to log to"));
+    }
+  }
+
+  @Test
+  /** Test that we can restart OK after removing a failed dir */
+  public void testRestartAfterFailingStorageDir() throws IOException {
+    assertEquals(0, numRemovedDirs());
+
+    checkFileCreation("file0");
+
+    FileUtil.fullyDelete(new File(nameDirs.get(0)));
+    secondaryNN.doCheckpoint();
+    assertEquals(1, numRemovedDirs());
+    assertEquals(nameDirs.get(0), getRemovedDirs().get(0).getRoot().getPath());
+    
+    checkFileCreation("file1");
+
+    new File(nameDirs.get(0)).mkdir();
+    cluster.restartNameNode();
+
+    // The dir was restored, is no longer considered removed
+    assertEquals(0, numRemovedDirs());
+    checkFileContents("file0");
+    checkFileContents("file1");
+  }
+}

+ 301 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+
+/**
+ * Startup and checkpoint tests
+ * 
+ */
+public class TestStorageRestore extends TestCase {
+  public static final String NAME_NODE_HOST = "localhost:";
+  public static final String NAME_NODE_HTTP_HOST = "0.0.0.0:";
+  private static final Log LOG = LogFactory.getLog(TestStorageRestore.class
+      .getName());
+  private Configuration config;
+  private File hdfsDir = null;
+  static final long seed = 0xAAAAEEFL;
+  static final int blockSize = 4096;
+  static final int fileSize = 8192;
+  private File path1, path2, path3;
+  private MiniDFSCluster cluster;
+
+  private void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+        .getInt("io.file.buffer.size", 4096), (short) repl, (long) blockSize);
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    stm.close();
+  }
+
+  protected void setUp() throws Exception {
+    config = new Configuration();
+    String baseDir = System.getProperty("test.build.data", "/tmp");
+
+    hdfsDir = new File(baseDir, "dfs");
+    if (hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir)) {
+      throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
+    }
+
+    hdfsDir.mkdir();
+    path1 = new File(hdfsDir, "name1");
+    path2 = new File(hdfsDir, "name2");
+    path3 = new File(hdfsDir, "name3");
+
+    path1.mkdir();
+    path2.mkdir();
+    path3.mkdir();
+    if (!path2.exists() || !path3.exists() || !path1.exists()) {
+      throw new IOException("Couldn't create dfs.name dirs");
+    }
+
+    String dfs_name_dir = new String(path1.getPath() + "," + path2.getPath());
+    System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
+        + "; dfs_name_dir = " + dfs_name_dir + ";dfs_name_edits_dir(only)="
+        + path3.getPath());
+
+    config.set("dfs.name.dir", dfs_name_dir);
+    config.set("dfs.name.edits.dir", dfs_name_dir + "," + path3.getPath());
+
+    config.set("fs.checkpoint.dir", new File(hdfsDir, "secondary").getPath());
+
+    FileSystem.setDefaultUri(config, "hdfs://" + NAME_NODE_HOST + "0");
+
+    config.set("dfs.secondary.http.address", "0.0.0.0:0");
+
+    // set the restore feature on
+    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, true);
+ }
+
+  /**
+   * clean up
+   */
+  public void tearDown() throws Exception {
+    if (hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir)) {
+      throw new IOException("Could not delete hdfs directory in tearDown '"
+          + hdfsDir + "'");
+    }
+  }
+
+  /**
+   * invalidate storage by removing sub-directory "current" in name2 and name3
+   */
+  public void invalidateStorage(FSImage fi) throws IOException {
+    for (Iterator<StorageDirectory> it = fi.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      
+      if (sd.getRoot().equals(path2) || sd.getRoot().equals(path3)) {
+        fi.getEditLog().removeEditsForStorageDir(sd);
+        fi.updateRemovedDirs(sd, null);
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * test
+   */
+  public void printStorages(FSImage fs) {
+    LOG.info("current storages and corresoponding sizes:");
+    for (Iterator<StorageDirectory> it = fs.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
+        File imf = FSImage.getImageFile(sd, NameNodeFile.IMAGE);
+        LOG.info("  image file " + imf.getAbsolutePath() + "; len = "
+            + imf.length());
+      }
+      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+        File edf = FSImage.getImageFile(sd, NameNodeFile.EDITS);
+        LOG.info("  edits file " + edf.getAbsolutePath() + "; len = "
+            + edf.length());
+      }
+    }
+  }
+  
+  /**
+   * This function returns a md5 hash of a file.
+   *
+   * @param file input file
+   * @return The md5 string
+   */
+  public String getFileMD5(File file) throws Exception {
+    String res = new String();
+    MessageDigest mD = MessageDigest.getInstance("MD5");
+    DataInputStream dis = new DataInputStream(new FileInputStream(file));
+
+    try {
+      while(true) {
+        mD.update(dis.readByte());
+      }
+    } catch (EOFException eof) {}
+
+    BigInteger bigInt = new BigInteger(1, mD.digest());
+    res = bigInt.toString(16);
+    dis.close();
+
+    return res;
+  }
+
+
+  /**
+   * check if files exist/not exist
+   */
+  public void checkFiles(boolean valid) {
+    // look at the valid storage
+    File fsImg1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.IMAGE.getName());
+    File fsImg2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.IMAGE.getName());
+    File fsImg3 = new File(path3, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.IMAGE.getName());
+
+    File fsEdits1 = new File(path1, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.EDITS.getName());
+    File fsEdits2 = new File(path2, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.EDITS.getName());
+    File fsEdits3 = new File(path3, Storage.STORAGE_DIR_CURRENT + "/"
+        + NameNodeFile.EDITS.getName());
+    
+    this.printStorages(cluster.getNameNode().getFSImage());
+    
+    String md5_1 = null,md5_2 = null,md5_3 = null;
+    try {
+      md5_1 = getFileMD5(fsEdits1);
+      md5_2 = getFileMD5(fsEdits2);
+      md5_3 = getFileMD5(fsEdits3);
+    } catch (Exception e) {
+      System.err.println("md 5 calculation failed:" + e.getLocalizedMessage());
+    }
+    
+    LOG.info("++++ image files = " + fsImg1.getAbsolutePath() + ","
+        + fsImg2.getAbsolutePath() + "," + fsImg3.getAbsolutePath());
+    LOG.info("++++ edits files = " + fsEdits1.getAbsolutePath() + ","
+        + fsEdits2.getAbsolutePath() + "," + fsEdits3.getAbsolutePath());
+    LOG.info("checkFiles compares lengths: img1=" + fsImg1.length() + ",img2="
+        + fsImg2.length() + ",img3=" + fsImg3.length());
+    LOG.info("checkFiles compares lengths: edits1=" + fsEdits1.length()
+        + ",edits2=" + fsEdits2.length() + ",edits3=" + fsEdits3.length());
+
+    if (valid) {
+      // should be the same
+      assertTrue(fsImg1.length() == fsImg2.length());
+      assertTrue(0 == fsImg3.length()); // shouldn't be created
+      assertTrue(fsEdits1.length() == fsEdits2.length());
+      assertTrue(md5_1.equals(md5_2));
+      assertTrue(md5_1.equals(md5_3));
+    } else {
+      // should be different
+      assertFalse(md5_1.equals(md5_2));
+      assertFalse(md5_1.equals(md5_2));
+    }
+  }
+
+  /**
+   * test 
+   * 1. create DFS cluster with 3 storage directories - 2 EDITS_IMAGE, 1 EDITS 
+   * 2. create a cluster and write a file 
+   * 3. corrupt/disable one storage (or two) by removing 
+   * 4. run doCheckpoint - it will fail on removed dirs (which will invalidate the storages)
+   * 5. write another file 
+   * 6. check that edits and fsimage differ 
+   * 7. run doCheckpoint 
+   * 8. verify that all the image and edits files are the same.
+   */
+  public void testStorageRestore() throws Exception {
+    int numDatanodes = 2;
+    cluster = new MiniDFSCluster(0, config, numDatanodes, true, false, true,
+        null, null, null, null);
+    cluster.waitActive();
+
+    SecondaryNameNode secondary = new SecondaryNameNode(config);
+    System.out.println("****testStorageRestore: Cluster and SNN started");
+    printStorages(cluster.getNameNode().getFSImage());
+
+    FileSystem fs = cluster.getFileSystem();
+    Path path = new Path("/", "test");
+    writeFile(fs, path, 2);
+
+    System.out
+        .println("****testStorageRestore: file test written, invalidating storage...");
+
+    invalidateStorage(cluster.getNameNode().getFSImage());
+    printStorages(cluster.getNameNode().getFSImage());
+    System.out
+        .println("****testStorageRestore: storage invalidated + doCheckpoint");
+
+    path = new Path("/", "test1");
+    writeFile(fs, path, 2);
+    System.out.println("****testStorageRestore: file test1 written");
+
+    checkFiles(false); // SHOULD BE FALSE
+
+    System.out.println("****testStorageRestore: checkfiles(false) run");
+
+    secondary.doCheckpoint(); // /should enable storage..
+
+    checkFiles(true);
+    
+    //use the recovered dir to restart nn
+    if (!FileUtil.fullyDelete(path1)) {
+      throw new Exception("Can't fully delete " + path1);
+    }
+    if (!FileUtil.fullyDelete(path3)) {
+      throw new Exception("Can't fully delete " + path3);
+    }
+    cluster.restartDataNodes();
+    cluster.waitActive();
+    File tmpfile = new File("/test");
+    assert(tmpfile.exists());
+    tmpfile = new File("/test1");
+    assert(tmpfile.exists());
+    
+    System.out
+        .println("****testStorageRestore: second Checkpoint done and checkFiles(true) run");
+    secondary.shutdown();
+    cluster.shutdown();
+  }
+}