Browse Source

svn merge -c 1165826 from trunk for HDFS-2018.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1228651 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
96692e19e4
24 changed files with 1147 additions and 1013 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 36 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  3. 21 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
  4. 6 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
  5. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
  6. 33 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  7. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
  8. 109 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  9. 39 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  10. 47 36
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  11. 82 73
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
  12. 11 47
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
  13. 12 369
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
  14. 184 48
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  15. 36 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
  17. 3 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
  18. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
  19. 209 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  20. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
  21. 12 12
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
  22. 2 326
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
  23. 262 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
  24. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -82,6 +82,9 @@ Release 0.23.1 - UNRELEASED
     for a client on the same node as the block file.  (Andrew Purtell,
     Suresh Srinivas and Jitendra Nath Pandey via szetszwo)
 
+    HDFS-2018. Move all journal stream management code into one place.
+               (Ivan Kelly via jitendra)
+
   BUG FIXES
 
     HDFS-2541. For a sufficiently large value of blocks, the DN Scanner 

+ 36 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -18,18 +18,21 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.zip.Checksum;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Extension of FSImage for the backup node.
@@ -257,11 +260,18 @@ public class BackupImage extends FSImage {
         new FSImageTransactionalStorageInspector();
       
       storage.inspectStorageDirs(inspector);
-      LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
-          target - 1);
-  
-      logLoadPlan.doRecovery();
-      loadEdits(logLoadPlan.getEditsFiles());
+
+      editLog.recoverUnclosedStreams();
+      Iterable<EditLogInputStream> editStreamsAll 
+        = editLog.selectInputStreams(lastAppliedTxId, target - 1);
+      // remove inprogress
+      List<EditLogInputStream> editStreams = Lists.newArrayList();
+      for (EditLogInputStream s : editStreamsAll) {
+        if (s.getFirstTxId() != editLog.getCurSegmentTxId()) {
+          editStreams.add(s);
+        }
+      }
+      loadEdits(editStreams);
     }
     
     // now, need to load the in-progress file
@@ -271,7 +281,24 @@ public class BackupImage extends FSImage {
         return false; // drop lock and try again to load local logs
       }
       
-      EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
+      EditLogInputStream stream = null;
+      Collection<EditLogInputStream> editStreams
+        = getEditLog().selectInputStreams(
+            getEditLog().getCurSegmentTxId(),
+            getEditLog().getCurSegmentTxId());
+      
+      for (EditLogInputStream s : editStreams) {
+        if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) {
+          stream = s;
+        }
+        break;
+      }
+      if (stream == null) {
+        LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId()
+                 + ". This indicates that there is an error in synchronization in BackupImage");
+        return false;
+      }
+
       try {
         long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
         
@@ -285,7 +312,7 @@ public class BackupImage extends FSImage {
           "expected to load " + remainingTxns + " but loaded " +
           numLoaded + " from " + stream;
       } finally {
-        IOUtils.closeStream(stream);
+        FSEditLog.closeAllStreams(editStreams);
       }
 
       LOG.info("Successfully synced BackupNode with NameNode at txnid " +

+ 21 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java

@@ -57,12 +57,31 @@ class BackupJournalManager implements JournalManager {
       throws IOException {
   }
 
+  @Override
+  public long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    return 0;
+  }
+  
+  @Override
+  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    throw new IOException("Unsupported operation");
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+  }
+
   public boolean matchesRegistration(NamenodeRegistration bnReg) {
     return bnReg.getAddress().equals(this.bnReg.getAddress());
   }
 
   @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
-    return null;
+  public String toString() {
+    return "BackupJournalManager";
   }
 }

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -275,16 +275,17 @@ class Checkpointer extends Daemon {
       FSImage dstImage) throws IOException {
     NNStorage dstStorage = dstImage.getStorage();
   
-    List<File> editsFiles = Lists.newArrayList();
+    List<EditLogInputStream> editsStreams = Lists.newArrayList();    
     for (RemoteEditLog log : manifest.getLogs()) {
       File f = dstStorage.findFinalizedEditsFile(
           log.getStartTxId(), log.getEndTxId());
       if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
-        editsFiles.add(f);
-      }
+        editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), 
+                                                    log.getEndTxId()));
+       }
     }
     LOG.info("Checkpointer about to load edits from " +
-        editsFiles.size() + " file(s).");
-    dstImage.loadEdits(editsFiles);
+        editsStreams.size() + " stream(s).");
+    dstImage.loadEdits(editsStreams);
   }
 }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import com.google.common.base.Preconditions;
 
 /**
@@ -122,4 +123,14 @@ class EditLogBackupInputStream extends EditLogInputStream {
     reader = null;
     this.version = 0;
   }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
+
+  @Override
+  public long getLastTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
 }

+ 33 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -37,12 +38,15 @@ import com.google.common.annotations.VisibleForTesting;
 class EditLogFileInputStream extends EditLogInputStream {
   private final File file;
   private final FileInputStream fStream;
+  final private long firstTxId;
+  final private long lastTxId;
   private final int logVersion;
   private final FSEditLogOp.Reader reader;
   private final FSEditLogLoader.PositionTrackingInputStream tracker;
   
   /**
    * Open an EditLogInputStream for the given file.
+   * The file is pretransactional, so has no txids
    * @param name filename to open
    * @throws LogHeaderCorruptException if the header is either missing or
    *         appears to be corrupt/truncated
@@ -51,6 +55,21 @@ class EditLogFileInputStream extends EditLogInputStream {
    */
   EditLogFileInputStream(File name)
       throws LogHeaderCorruptException, IOException {
+    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+  }
+
+  /**
+   * Open an EditLogInputStream for the given file.
+   * @param name filename to open
+   * @param firstTxId first transaction found in file
+   * @param lastTxId last transaction id found in file
+   * @throws LogHeaderCorruptException if the header is either missing or
+   *         appears to be corrupt/truncated
+   * @throws IOException if an actual IO error occurs while reading the
+   *         header
+   */
+  EditLogFileInputStream(File name, long firstTxId, long lastTxId)
+      throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
 
@@ -65,6 +84,18 @@ class EditLogFileInputStream extends EditLogInputStream {
     }
 
     reader = new FSEditLogOp.Reader(in, logVersion);
+    this.firstTxId = firstTxId;
+    this.lastTxId = lastTxId;
+  }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return firstTxId;
+  }
+  
+  @Override
+  public long getLastTxId() throws IOException {
+    return lastTxId;
   }
 
   @Override // JournalStream
@@ -116,7 +147,8 @@ class EditLogFileInputStream extends EditLogInputStream {
       // If it's missing its header, this is equivalent to no transactions
       FSImage.LOG.warn("Log at " + file + " has no valid header",
           corrupt);
-      return new FSEditLogLoader.EditLogValidation(0, 0);
+      return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, 
+                                                   HdfsConstants.INVALID_TXID);
     }
     
     try {

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java

@@ -28,6 +28,17 @@ import java.io.IOException;
  * into the #{@link EditLogOutputStream}.
  */
 abstract class EditLogInputStream implements JournalStream, Closeable {
+  /** 
+   * @return the first transaction which will be found in this stream
+   */
+  public abstract long getFirstTxId() throws IOException;
+  
+  /** 
+   * @return the last transaction which will be found in this stream
+   */
+  public abstract long getLastTxId() throws IOException;
+
+
   /**
    * Close the stream.
    * @throws IOException if an error occurred while closing

+ 109 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -1068,6 +1071,112 @@ public class FSEditLog  {
     }
   }
 
+  /**
+   * Find the best editlog input stream to read from txid. In this case
+   * best means the editlog which has the largest continuous range of 
+   * transactions starting from the transaction id, fromTxId.
+   *
+   * If a journal throws an CorruptionException while reading from a txn id,
+   * it means that it has more transactions, but can't find any from fromTxId. 
+   * If this is the case and no other journal has transactions, we should throw
+   * an exception as it means more transactions exist, we just can't load them.
+   *
+   * @param fromTxId Transaction id to start from.
+   * @return a edit log input stream with tranactions fromTxId 
+   *         or null if no more exist
+   */
+  private EditLogInputStream selectStream(long fromTxId) 
+      throws IOException {
+    JournalManager bestjm = null;
+    long bestjmNumTxns = 0;
+    CorruptionException corruption = null;
+
+    for (JournalAndStream jas : journals) {
+      JournalManager candidate = jas.getManager();
+      long candidateNumTxns = 0;
+      try {
+        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
+      } catch (CorruptionException ce) {
+        corruption = ce;
+      } catch (IOException ioe) {
+        LOG.warn("Error reading number of transactions from " + candidate);
+        continue; // error reading disk, just skip
+      }
+      
+      if (candidateNumTxns > bestjmNumTxns) {
+        bestjm = candidate;
+        bestjmNumTxns = candidateNumTxns;
+      }
+    }
+    
+    
+    if (bestjm == null) {
+      /**
+       * If all candidates either threw a CorruptionException or
+       * found 0 transactions, then a gap exists. 
+       */
+      if (corruption != null) {
+        throw new IOException("Gap exists in logs from " 
+                              + fromTxId, corruption);
+      } else {
+        return null;
+      }
+    }
+
+    return bestjm.getInputStream(fromTxId);
+  }
+
+  /**
+   * Run recovery on all journals to recover any unclosed segments
+   */
+  void recoverUnclosedStreams() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.manager.recoverUnfinalizedSegments();
+        }
+      }, "recovering unclosed streams");
+  }
+
+  /**
+   * Select a list of input streams to load.
+   * @param fromTxId first transaction in the selected streams
+   * @param toAtLeast the selected streams must contain this transaction
+   */
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
+      throws IOException {
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    
+    boolean gapFound = false;
+    EditLogInputStream stream = selectStream(fromTxId);
+    while (stream != null) {
+      fromTxId = stream.getLastTxId() + 1;
+      streams.add(stream);
+      try {
+        stream = selectStream(fromTxId);
+      } catch (IOException ioe) {
+        gapFound = true;
+        break;
+      }
+    }
+    if (fromTxId <= toAtLeastTxId || gapFound) {
+      closeAllStreams(streams);
+      throw new IOException("No non-corrupt logs for txid " 
+                            + fromTxId);
+    }
+    return streams;
+  }
+
+  /** 
+   * Close all the streams in a collection
+   * @param streams The list of streams to close
+   */
+  static void closeAllStreams(Iterable<EditLogInputStream> streams) {
+    for (EditLogInputStream s : streams) {
+      IOUtils.closeStream(s);
+    }
+  }
+
   /**
    * Container for a JournalManager paired with its currently
    * active stream.
@@ -1137,30 +1246,5 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
-
-    private EditLogInputStream getInProgressInputStream() throws IOException {
-      return manager.getInProgressInputStream(segmentStartsAtTxId);
-    }
-  }
-
-  /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. This is used from the BackupNode
-   * during edits synchronization.
-   * @throws IOException if no valid logs are available.
-   */
-  synchronized EditLogInputStream getInProgressFileInputStream()
-      throws IOException {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      try {
-        EditLogInputStream in = jas.getInProgressInputStream();
-        if (in != null) return in;
-      } catch (IOException ioe) {
-        LOG.warn("Unable to get the in-progress input stream from " + jas,
-            ioe);
-      }
-    }
-    throw new IOException("No in-progress stream provided edits");
   }
 }

+ 39 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -446,24 +446,6 @@ public class FSEditLogLoader {
     }
   }
   
-  static EditLogValidation validateEditLog(File file) throws IOException {
-    EditLogFileInputStream in;
-    try {
-      in = new EditLogFileInputStream(file);
-    } catch (LogHeaderCorruptException corrupt) {
-      // If it's missing its header, this is equivalent to no transactions
-      FSImage.LOG.warn("Log at " + file + " has no valid header",
-          corrupt);
-      return new EditLogValidation(0, 0);
-    }
-    
-    try {
-      return validateEditLog(in);
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-
   /**
    * Return the number of valid transactions in the stream. If the stream is
    * truncated during the header, returns a value indicating that there are
@@ -473,12 +455,26 @@ public class FSEditLogLoader {
    *                     if the log does not exist)
    */
   static EditLogValidation validateEditLog(EditLogInputStream in) {
-    long numValid = 0;
     long lastPos = 0;
+    long firstTxId = HdfsConstants.INVALID_TXID;
+    long lastTxId = HdfsConstants.INVALID_TXID;
+    long numValid = 0;
     try {
+      FSEditLogOp op = null;
       while (true) {
         lastPos = in.getPosition();
-        if (in.readOp() == null) {
+        if ((op = in.readOp()) == null) {
+          break;
+        }
+        if (firstTxId == HdfsConstants.INVALID_TXID) {
+          firstTxId = op.txid;
+        }
+        if (lastTxId == HdfsConstants.INVALID_TXID
+            || op.txid == lastTxId + 1) {
+          lastTxId = op.txid;
+        } else {
+          FSImage.LOG.error("Out of order txid found. Found " + op.txid 
+                            + ", expected " + (lastTxId + 1));
           break;
         }
         numValid++;
@@ -489,16 +485,33 @@ public class FSEditLogLoader {
       FSImage.LOG.debug("Caught exception after reading " + numValid +
           " ops from " + in + " while determining its valid length.", t);
     }
-    return new EditLogValidation(lastPos, numValid);
+    return new EditLogValidation(lastPos, firstTxId, lastTxId);
   }
   
   static class EditLogValidation {
-    long validLength;
-    long numTransactions;
-    
-    EditLogValidation(long validLength, long numTransactions) {
+    private long validLength;
+    private long startTxId;
+    private long endTxId;
+     
+    EditLogValidation(long validLength, 
+                      long startTxId, long endTxId) {
       this.validLength = validLength;
-      this.numTransactions = numTransactions;
+      this.startTxId = startTxId;
+      this.endTxId = endTxId;
+    }
+    
+    long getValidLength() { return validLength; }
+    
+    long getStartTxId() { return startTxId; }
+    
+    long getEndTxId() { return endTxId; }
+    
+    long getNumTransactions() { 
+      if (endTxId == HdfsConstants.INVALID_TXID
+          || startTxId == HdfsConstants.INVALID_TXID) {
+        return 0;
+      }
+      return (endTxId - startTxId) + 1;
     }
   }
 

+ 47 - 36
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
+
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -584,32 +584,38 @@ public class FSImage implements Closeable {
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
     isUpgradeFinalized = inspector.isUpgradeFinalized();
-    
+ 
+    FSImageStorageInspector.FSImageFile imageFile 
+      = inspector.getLatestImage();   
     boolean needToSave = inspector.needToSave();
-    
-    // Plan our load. This will throw if it's impossible to load from the
-    // data that's available.
-    LoadPlan loadPlan = inspector.createLoadPlan();    
-    LOG.debug("Planning to load image using following plan:\n" + loadPlan);
 
-    
-    // Recover from previous interrupted checkpoint, if any
-    needToSave |= loadPlan.doRecovery();
+    Iterable<EditLogInputStream> editStreams = null;
 
-    //
-    // Load in bits
-    //
-    StorageDirectory sdForProperties =
-      loadPlan.getStorageDirectoryForProperties();
-    storage.readProperties(sdForProperties);
-    File imageFile = loadPlan.getImageFile();
+    editLog.recoverUnclosedStreams();
 
+    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
+                               getLayoutVersion())) {
+      editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
+                                               inspector.getMaxSeenTxId());
+    } else {
+      editStreams = FSImagePreTransactionalStorageInspector
+        .getEditLogStreams(storage);
+    }
+ 
+    LOG.debug("Planning to load image :\n" + imageFile);
+    for (EditLogInputStream l : editStreams) {
+      LOG.debug("\t Planning to load edit stream: " + l);
+    }
+    
     try {
+      StorageDirectory sdForProperties = imageFile.sd;
+      storage.readProperties(sdForProperties);
+
       if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
                                  getLayoutVersion())) {
         // For txid-based layout, we should have a .md5 file
         // next to the image file
-        loadFSImage(imageFile);
+        loadFSImage(imageFile.getFile());
       } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
                                         getLayoutVersion())) {
         // In 0.22, we have the checksum stored in the VERSION file.
@@ -621,17 +627,19 @@ public class FSImage implements Closeable {
               NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
               " not set for storage directory " + sdForProperties.getRoot());
         }
-        loadFSImage(imageFile, new MD5Hash(md5));
+        loadFSImage(imageFile.getFile(), new MD5Hash(md5));
       } else {
         // We don't have any record of the md5sum
-        loadFSImage(imageFile, null);
+        loadFSImage(imageFile.getFile(), null);
       }
     } catch (IOException ioe) {
-      throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
+      FSEditLog.closeAllStreams(editStreams);
+      throw new IOException("Failed to load image from " + imageFile, ioe);
     }
     
-    long numLoaded = loadEdits(loadPlan.getEditsFiles());
-    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
+    long numLoaded = loadEdits(editStreams);
+    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
+                                                    numLoaded);
     
     // update the txid for the edit log
     editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
@@ -663,22 +671,25 @@ public class FSImage implements Closeable {
    * Load the specified list of edit files into the image.
    * @return the number of transactions loaded
    */
-  protected long loadEdits(List<File> editLogs) throws IOException {
-    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
+  protected long loadEdits(Iterable<EditLogInputStream> editStreams) throws IOException {
+    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
 
     long startingTxId = getLastAppliedTxId() + 1;
-    
-    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
     int numLoaded = 0;
-    // Load latest edits
-    for (File edits : editLogs) {
-      LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
-      EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
-      int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-      startingTxId += thisNumLoaded;
-      numLoaded += thisNumLoaded;
-      lastAppliedTxId += thisNumLoaded;
-      editIn.close();
+
+    try {    
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+      
+      // Load latest edits
+      for (EditLogInputStream editIn : editStreams) {
+        LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
+        int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+        startingTxId += thisNumLoaded;
+        numLoaded += thisNumLoaded;
+        lastAppliedTxId += thisNumLoaded;
+      }
+    } finally {
+      FSEditLog.closeAllStreams(editStreams);
     }
 
     // update the counts

+ 82 - 73
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
   private boolean hasOutOfDateStorageDirs = false;
   /* Flag set false if there are any "previous" directories found */
   private boolean isUpgradeFinalized = true;
+  private boolean needToSaveAfterRecovery = false;
   
   // Track the name and edits dir with the latest times
   private long latestNameCheckpointTime = Long.MIN_VALUE;
@@ -139,15 +141,15 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
   boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
   }
-  
+    
   @Override
-  LoadPlan createLoadPlan() throws IOException {
+  FSImageFile getLatestImage() throws IOException {
     // We should have at least one image and one edits dirs
     if (latestNameSD == null)
       throw new IOException("Image file is not found in " + imageDirs);
     if (latestEditsSD == null)
       throw new IOException("Edits file is not found in " + editsDirs);
-
+    
     // Make sure we are loading image and edits from same checkpoint
     if (latestNameCheckpointTime > latestEditsCheckpointTime
         && latestNameSD != latestEditsSD
@@ -168,92 +170,70 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
                       "image checkpoint time = " + latestNameCheckpointTime +
                       "edits checkpoint time = " + latestEditsCheckpointTime);
     }
+
+    needToSaveAfterRecovery = doRecovery();
     
-    return new PreTransactionalLoadPlan();
+    return new FSImageFile(latestNameSD, 
+        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
+        HdfsConstants.INVALID_TXID);
   }
-  
+
   @Override
   boolean needToSave() {
     return hasOutOfDateStorageDirs ||
       checkpointTimes.size() != 1 ||
-      latestNameCheckpointTime > latestEditsCheckpointTime;
-
+      latestNameCheckpointTime > latestEditsCheckpointTime ||
+      needToSaveAfterRecovery;
   }
   
-  private class PreTransactionalLoadPlan extends LoadPlan {
-
-    @Override
-    boolean doRecovery() throws IOException {
-      LOG.debug(
+  boolean doRecovery() throws IOException {
+    LOG.debug(
         "Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
       
-      boolean needToSave = false;
-      File curFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
-      File ckptFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
-
-      //
-      // If we were in the midst of a checkpoint
-      //
-      if (ckptFile.exists()) {
-        needToSave = true;
-        if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
-              .exists()) {
-          //
-          // checkpointing migth have uploaded a new
-          // merged image, but we discard it here because we are
-          // not sure whether the entire merged image was uploaded
-          // before the namenode crashed.
-          //
-          if (!ckptFile.delete()) {
-            throw new IOException("Unable to delete " + ckptFile);
-          }
-        } else {
-          //
-          // checkpointing was in progress when the namenode
-          // shutdown. The fsimage.ckpt was created and the edits.new
-          // file was moved to edits. We complete that checkpoint by
-          // moving fsimage.new to fsimage. There is no need to 
-          // update the fstime file here. renameTo fails on Windows
-          // if the destination file already exists.
-          //
+    boolean needToSave = false;
+    File curFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+    File ckptFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
+    
+    //
+    // If we were in the midst of a checkpoint
+    //
+    if (ckptFile.exists()) {
+      needToSave = true;
+      if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
+          .exists()) {
+        //
+        // checkpointing migth have uploaded a new
+        // merged image, but we discard it here because we are
+        // not sure whether the entire merged image was uploaded
+        // before the namenode crashed.
+        //
+        if (!ckptFile.delete()) {
+          throw new IOException("Unable to delete " + ckptFile);
+        }
+      } else {
+        //
+        // checkpointing was in progress when the namenode
+        // shutdown. The fsimage.ckpt was created and the edits.new
+        // file was moved to edits. We complete that checkpoint by
+        // moving fsimage.new to fsimage. There is no need to 
+        // update the fstime file here. renameTo fails on Windows
+        // if the destination file already exists.
+        //
+        if (!ckptFile.renameTo(curFile)) {
+          if (!curFile.delete())
+            LOG.warn("Unable to delete dir " + curFile + " before rename");
           if (!ckptFile.renameTo(curFile)) {
-            if (!curFile.delete())
-              LOG.warn("Unable to delete dir " + curFile + " before rename");
-            if (!ckptFile.renameTo(curFile)) {
-              throw new IOException("Unable to rename " + ckptFile +
-                                    " to " + curFile);
-            }
+            throw new IOException("Unable to rename " + ckptFile +
+                                  " to " + curFile);
           }
         }
       }
-      return needToSave;
     }
-
-    @Override
-    File getImageFile() {
-      return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
-    }
-
-    @Override
-    List<File> getEditsFiles() {
-      if (latestNameCheckpointTime > latestEditsCheckpointTime) {
-        // the image is already current, discard edits
-        LOG.debug(
-          "Name checkpoint time is newer than edits, not loading edits.");
-        return Collections.<File>emptyList();
-      }
-      
-      return getEditsInStorageDir(latestEditsSD);
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return latestNameSD;
-    }    
+    return needToSave;
   }
-
+  
   /**
    * @return a list with the paths to EDITS and EDITS_NEW (if it exists)
    * in a given storage directory.
@@ -269,4 +249,33 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
     }
     return files;
   }
+  
+  private List<File> getLatestEditsFiles() {
+    if (latestNameCheckpointTime > latestEditsCheckpointTime) {
+      // the image is already current, discard edits
+      LOG.debug(
+          "Name checkpoint time is newer than edits, not loading edits.");
+      return Collections.<File>emptyList();
+    }
+    
+    return getEditsInStorageDir(latestEditsSD);
+  }
+  
+  @Override
+  long getMaxSeenTxId() {
+    return 0L;
+  }
+
+  static Iterable<EditLogInputStream> getEditLogStreams(NNStorage storage)
+      throws IOException {
+    FSImagePreTransactionalStorageInspector inspector 
+      = new FSImagePreTransactionalStorageInspector();
+    storage.inspectStorageDirs(inspector);
+
+    List<EditLogInputStream> editStreams = new ArrayList<EditLogInputStream>();
+    for (File f : inspector.getLatestEditsFiles()) {
+      editStreams.add(new EditLogFileInputStream(f));
+    }
+    return editStreams;
+  }
 }

+ 11 - 47
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -43,60 +44,22 @@ abstract class FSImageStorageInspector {
   abstract boolean isUpgradeFinalized();
   
   /**
-   * Create a plan to load the image from the set of inspected storage directories.
+   * Get the image files which should be loaded into the filesystem.
    * @throws IOException if not enough files are available (eg no image found in any directory)
    */
-  abstract LoadPlan createLoadPlan() throws IOException;
-  
+  abstract FSImageFile getLatestImage() throws IOException;
+
+  /** 
+   * Get the minimum tx id which should be loaded with this set of images.
+   */
+  abstract long getMaxSeenTxId();
+
   /**
    * @return true if the directories are in such a state that the image should be re-saved
    * following the load
    */
   abstract boolean needToSave();
 
-  /**
-   * A plan to load the namespace from disk, providing the locations from which to load
-   * the image and a set of edits files.
-   */
-  abstract static class LoadPlan {
-    /**
-     * Execute atomic move sequence in the chosen storage directories,
-     * in order to recover from an interrupted checkpoint.
-     * @return true if some recovery action was taken
-     */
-    abstract boolean doRecovery() throws IOException;
-
-    /**
-     * @return the file from which to load the image data
-     */
-    abstract File getImageFile();
-    
-    /**
-     * @return a list of flies containing edits to replay
-     */
-    abstract List<File> getEditsFiles();
-    
-    /**
-     * @return the storage directory containing the VERSION file that should be
-     * loaded.
-     */
-    abstract StorageDirectory getStorageDirectoryForProperties();
-    
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Will load image file: ").append(getImageFile()).append("\n");
-      sb.append("Will load edits files:").append("\n");
-      for (File f : getEditsFiles()) {
-        sb.append("  ").append(f).append("\n");
-      }
-      sb.append("Will load metadata from: ")
-        .append(getStorageDirectoryForProperties())
-        .append("\n");
-      return sb.toString();
-    }
-  }
-
   /**
    * Record of an image that has been located and had its filename parsed.
    */
@@ -106,7 +69,8 @@ abstract class FSImageStorageInspector {
     private final File file;
     
     FSImageFile(StorageDirectory sd, File file, long txId) {
-      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+      assert txId >= 0 || txId == HdfsConstants.INVALID_TXID 
+        : "Invalid txid on " + file +": " + txId;
       
       this.sd = sd;
       this.txId = txId;

+ 12 - 369
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -52,9 +51,7 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
   private boolean isUpgradeFinalized = true;
   
   List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
-  List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
-  SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
-  long maxSeenTxId = 0;
+  private long maxSeenTxId = 0;
   
   private static final Pattern IMAGE_REGEX = Pattern.compile(
     NameNodeFile.IMAGE.getName() + "_(\\d+)");
@@ -68,6 +65,8 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
       return;
     }
     
+    maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+
     File currentDir = sd.getCurrentDir();
     File filesInStorage[];
     try {
@@ -110,34 +109,10 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
       LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
-    List<EditLogFile> editLogs 
-      = FileJournalManager.matchEditLogs(filesInStorage);
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      for (EditLogFile log : editLogs) {
-        addEditLog(log);
-      }
-    } else if (!editLogs.isEmpty()){
-      LOG.warn("Found the following edit log file(s) in " + sd +
-          " even though it was not configured to store edits:\n" +
-          "  " + Joiner.on("\n  ").join(editLogs));
-          
-    }
-    
     // set finalized flag
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }
 
-  private void addEditLog(EditLogFile foundEditLog) {
-    foundEditLogs.add(foundEditLog);
-    LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
-    if (group == null) {
-      group = new LogGroup(foundEditLog.getFirstTxId());
-      logGroups.put(foundEditLog.getFirstTxId(), group);
-    }
-    group.add(foundEditLog);
-  }
-
-
   @Override
   public boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
@@ -148,9 +123,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
    * If there are multiple storage directories which contain equal images 
    * the storage directory that was inspected first will be preferred.
    * 
-   * Returns null if no images were found.
+   * @throws FileNotFoundException if not images are found.
    */
-  FSImageFile getLatestImage() {
+  FSImageFile getLatestImage() throws IOException {
+    if (foundImages.isEmpty()) {
+      throw new FileNotFoundException("No valid image files found");
+    }
+
     FSImageFile ret = null;
     for (FSImageFile img : foundImages) {
       if (ret == null || img.txId > ret.txId) {
@@ -164,349 +143,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
     return ImmutableList.copyOf(foundImages);
   }
   
-  public List<EditLogFile> getEditLogFiles() {
-    return ImmutableList.copyOf(foundEditLogs);
-  }
-
-  @Override
-  public LoadPlan createLoadPlan() throws IOException {
-    if (foundImages.isEmpty()) {
-      throw new FileNotFoundException("No valid image files found");
-    }
-
-    FSImageFile recoveryImage = getLatestImage();
-    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
-
-    return new TransactionalLoadPlan(recoveryImage,
-        logPlan);
-  }
-  
-  /**
-   * Plan which logs to load in order to bring the namespace up-to-date.
-   * Transactions will be considered in the range (sinceTxId, maxTxId]
-   * 
-   * @param sinceTxId the highest txid that is already loaded 
-   *                  (eg from the image checkpoint)
-   * @param maxStartTxId ignore any log files that start after this txid
-   */
-  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
-    long expectedTxId = sinceTxId + 1;
-    
-    List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
-    
-    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
-    if (logGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
-          " groups of logs because they start with a txid less than image " +
-          "txid " + sinceTxId);
-    }
-    
-    SortedMap<Long, LogGroup> usefulGroups;
-    if (maxStartTxId > sinceTxId) {
-      usefulGroups = tailGroups.headMap(maxStartTxId);
-    } else {
-      usefulGroups = new TreeMap<Long, LogGroup>();
-    }
-    
-    if (usefulGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
-        " groups of logs because they start with a txid higher than max " +
-        "txid " + sinceTxId);
-    }
-
-
-    for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
-      long logStartTxId = entry.getKey();
-      LogGroup logGroup = entry.getValue();
-      
-      logGroup.planRecovery();
-      
-      if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) {
-        throw new IOException("Expected next log group would start at txid " +
-            expectedTxId + " but starts at txid " + logStartTxId);
-      }
-      
-      // We can pick any of the non-corrupt logs here
-      recoveryLogs.add(logGroup.getBestNonCorruptLog());
-      
-      // If this log group was finalized, we know to expect the next
-      // log group to start at the following txid (ie no gaps)
-      if (logGroup.hasKnownLastTxId()) {
-        expectedTxId = logGroup.getLastTxId() + 1;
-      } else {
-        // the log group was in-progress so we don't know what ID
-        // the next group should start from.
-        expectedTxId = HdfsConstants.INVALID_TXID;
-      }
-    }
-    
-    long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
-        0 : usefulGroups.lastKey();
-    if (maxSeenTxId > sinceTxId &&
-        maxSeenTxId > lastLogGroupStartTxId) {
-      String msg = "At least one storage directory indicated it has seen a " +
-        "log segment starting at txid " + maxSeenTxId;
-      if (usefulGroups.isEmpty()) {
-        msg += " but there are no logs to load.";
-      } else {
-        msg += " but the most recent log file found starts with txid " +
-          lastLogGroupStartTxId;
-      }
-      throw new IOException(msg);
-    }
-    
-    return new LogLoadPlan(recoveryLogs,
-        Lists.newArrayList(usefulGroups.values()));
-
-  }
-
   @Override
   public boolean needToSave() {
     return needToSave;
   }
-  
-  /**
-   * A group of logs that all start at the same txid.
-   * 
-   * Handles determining which logs are corrupt and which should be considered
-   * candidates for loading.
-   */
-  static class LogGroup {
-    long startTxId;
-    List<EditLogFile> logs = new ArrayList<EditLogFile>();;
-    private Set<Long> endTxIds = new TreeSet<Long>();
-    private boolean hasInProgress = false;
-    private boolean hasFinalized = false;
-        
-    LogGroup(long startTxId) {
-      this.startTxId = startTxId;
-    }
-    
-    EditLogFile getBestNonCorruptLog() {
-      // First look for non-corrupt finalized logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt() && !log.isInProgress()) {
-          return log;
-        }
-      }
-      // Then look for non-corrupt in-progress logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt()) {
-          return log;
-        }
-      }
 
-      // We should never get here, because we don't get to the planning stage
-      // without calling planRecovery first, and if we've called planRecovery,
-      // we would have already thrown if there were no non-corrupt logs!
-      throw new IllegalStateException(
-        "No non-corrupt logs for txid " + startTxId);
-    }
-
-    /**
-     * @return true if we can determine the last txid in this log group.
-     */
-    boolean hasKnownLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * @return the last txid included in the logs in this group
-     * @throws IllegalStateException if it is unknown -
-     *                               {@see #hasKnownLastTxId()}
-     */
-    long getLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return log.getLastTxId();
-        }
-      }
-      throw new IllegalStateException("LogGroup only has in-progress logs");
-    }
-
-    
-    void add(EditLogFile log) {
-      assert log.getFirstTxId() == startTxId;
-      logs.add(log);
-      
-      if (log.isInProgress()) {
-        hasInProgress = true;
-      } else {
-        hasFinalized = true;
-        endTxIds.add(log.getLastTxId());
-      }
-    }
-    
-    void planRecovery() throws IOException {
-      assert hasInProgress || hasFinalized;
-      
-      checkConsistentEndTxIds();
-        
-      if (hasFinalized && hasInProgress) {
-        planMixedLogRecovery();
-      } else if (!hasFinalized && hasInProgress) {
-        planAllInProgressRecovery();
-      } else if (hasFinalized && !hasInProgress) {
-        LOG.debug("No recovery necessary for logs starting at txid " +
-                  startTxId);
-      }
-    }
-
-    /**
-     * Recovery case for when some logs in the group were in-progress, and
-     * others were finalized. This happens when one of the storage
-     * directories fails.
-     *
-     * The in-progress logs in this case should be considered corrupt.
-     */
-    private void planMixedLogRecovery() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isInProgress()) {
-          LOG.warn("Log at " + log.getFile() + " is in progress, but " +
-                   "other logs starting at the same txid " + startTxId +
-                   " are finalized. Moving aside.");
-          log.markCorrupt();
-        }
-      }
-    }
-    
-    /**
-     * Recovery case for when all of the logs in the group were in progress.
-     * This happens if the NN completely crashes and restarts. In this case
-     * we check the non-zero lengths of each log file, and any logs that are
-     * less than the max of these lengths are considered corrupt.
-     */
-    private void planAllInProgressRecovery() throws IOException {
-      // We only have in-progress logs. We need to figure out which logs have
-      // the latest data to reccover them
-      LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
-               "in-progress (probably truncated due to a previous NameNode " +
-               "crash)");
-      if (logs.size() == 1) {
-        // Only one log, it's our only choice!
-        EditLogFile log = logs.get(0);
-        if (log.validateLog().numTransactions == 0) {
-          // If it has no transactions, we should consider it corrupt just
-          // to be conservative.
-          // See comment below for similar case
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();          
-        }
-        return;
-      }
-
-      long maxValidTxnCount = Long.MIN_VALUE;
-      for (EditLogFile log : logs) {
-        long validTxnCount = log.validateLog().numTransactions;
-        LOG.warn("  Log " + log.getFile() +
-            " valid txns=" + validTxnCount +
-            " valid len=" + log.validateLog().validLength);
-        maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
-      }        
-
-      for (EditLogFile log : logs) {
-        long txns = log.validateLog().numTransactions;
-        if (txns < maxValidTxnCount) {
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-                   "it is has only " + txns + " valid txns whereas another " +
-                   "log has " + maxValidTxnCount);
-          log.markCorrupt();
-        } else if (txns == 0) {
-          // this can happen if the NN crashes right after rolling a log
-          // but before the START_LOG_SEGMENT txn is written. Since the log
-          // is empty, we can just move it aside to its corrupt name.
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();
-        }
-      }
-    }
-
-    /**
-     * Check for the case when we have multiple finalized logs and they have
-     * different ending transaction IDs. This violates an invariant that all
-     * log directories should roll together. We should abort in this case.
-     */
-    private void checkConsistentEndTxIds() throws IOException {
-      if (hasFinalized && endTxIds.size() > 1) {
-        throw new IOException("More than one ending txid was found " +
-            "for logs starting at txid " + startTxId + ". " +
-            "Found: " + StringUtils.join(endTxIds, ','));
-      }
-    }
-
-    void recover() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isCorrupt()) {
-          log.moveAsideCorruptFile();
-        } else if (log.isInProgress()) {
-          log.finalizeLog();
-        }
-      }
-    }    
-  }
-  
-  static class TransactionalLoadPlan extends LoadPlan {
-    final FSImageFile image;
-    final LogLoadPlan logPlan;
-    
-    public TransactionalLoadPlan(FSImageFile image,
-        LogLoadPlan logPlan) {
-      super();
-      this.image = image;
-      this.logPlan = logPlan;
-    }
-
-    @Override
-    boolean doRecovery() throws IOException {
-      logPlan.doRecovery();
-      return false;
-    }
-
-    @Override
-    File getImageFile() {
-      return image.getFile();
-    }
-
-    @Override
-    List<File> getEditsFiles() {
-      return logPlan.getEditsFiles();
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return image.sd;
-    }
-  }
-  
-  static class LogLoadPlan {
-    final List<EditLogFile> editLogs;
-    final List<LogGroup> logGroupsToRecover;
-    
-    LogLoadPlan(List<EditLogFile> editLogs,
-        List<LogGroup> logGroupsToRecover) {
-      this.editLogs = editLogs;
-      this.logGroupsToRecover = logGroupsToRecover;
-    }
-
-    public void doRecovery() throws IOException {
-      for (LogGroup g : logGroupsToRecover) {
-        g.recover();
-      }
-    }
-
-    public List<File> getEditsFiles() {
-      List<File> ret = new ArrayList<File>();
-      for (EditLogFile log : editLogs) {
-        ret.add(log.getFile());
-      }
-      return ret;
-    }
+  @Override
+  long getMaxSeenTxId() {
+    return maxSeenTxId;
   }
 }

+ 184 - 48
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Comparator;
+import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
@@ -57,6 +60,9 @@ class FileJournalManager implements JournalManager {
   private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
     NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
 
+  private File currentInProgress = null;
+  private long maxSeenTransaction = 0L;
+
   @VisibleForTesting
   StoragePurger purger
     = new NNStorageRetentionManager.DeletionStoragePurger();
@@ -66,19 +72,20 @@ class FileJournalManager implements JournalManager {
   }
 
   @Override
-  public EditLogOutputStream startLogSegment(long txid) throws IOException {    
-    File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
-    EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+  synchronized public EditLogOutputStream startLogSegment(long txid) 
+      throws IOException {
+    currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+    EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
         outputBufferCapacity);
     stm.create();
     return stm;
   }
 
   @Override
-  public void finalizeLogSegment(long firstTxId, long lastTxId)
+  synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
       throws IOException {
-    File inprogressFile = NNStorage.getInProgressEditsFile(
-        sd, firstTxId);
+    File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
+
     File dstFile = NNStorage.getFinalizedEditsFile(
         sd, firstTxId, lastTxId);
     LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
@@ -89,6 +96,9 @@ class FileJournalManager implements JournalManager {
     if (!inprogressFile.renameTo(dstFile)) {
       throw new IOException("Unable to finalize edits file " + inprogressFile);
     }
+    if (inprogressFile.equals(currentInProgress)) {
+      currentInProgress = null;
+    }
   }
 
   @VisibleForTesting
@@ -97,12 +107,7 @@ class FileJournalManager implements JournalManager {
   }
 
   @Override
-  public String toString() {
-    return "FileJournalManager for storage directory " + sd;
-  }
-
-  @Override
-  public void setOutputBufferCapacity(int size) {
+  synchronized public void setOutputBufferCapacity(int size) {
     this.outputBufferCapacity = size;
   }
 
@@ -120,13 +125,6 @@ class FileJournalManager implements JournalManager {
     }
   }
 
-  @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-      throws IOException {
-    File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
-    return new EditLogFileInputStream(f);
-  }
-  
   /**
    * Find all editlog segments starting at or above the given txid.
    * @param fromTxId the txnid which to start looking
@@ -178,17 +176,156 @@ class FileJournalManager implements JournalManager {
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-            new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
+              new EditLogFile(f, startTxId, startTxId, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
           // skip
-        }          
+        }
       }
     }
     return ret;
   }
 
+  @Override
+  synchronized public EditLogInputStream getInputStream(long fromTxId) 
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (elf.getFirstTxId() == fromTxId) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Returning edit stream reading from " + elf);
+        }
+        return new EditLogFileInputStream(elf.getFile(), 
+            elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+
+    throw new IOException("Cannot find editlog file with " + fromTxId
+        + " as first first txid");
+  }
+
+  @Override
+  public long getNumberOfTransactions(long fromTxId) 
+      throws IOException, CorruptionException {
+    long numTxns = 0L;
+    
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Counting " + elf);
+      }
+      if (elf.getFirstTxId() > fromTxId) { // there must be a gap
+        LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+            + fromTxId + " - " + (elf.getFirstTxId() - 1));
+        break;
+      } else if (fromTxId == elf.getFirstTxId()) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        } 
+
+        if (elf.isCorrupt()) {
+          break;
+        }
+        fromTxId = elf.getLastTxId() + 1;
+        numTxns += fromTxId - elf.getFirstTxId();
+        
+        if (elf.isInProgress()) {
+          break;
+        }
+      } // else skip
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Journal " + this + " has " + numTxns 
+                + " txns from " + fromTxId);
+    }
+
+    long max = findMaxTransaction();
+    // fromTxId should be greater than max, as it points to the next 
+    // transaction we should expect to find. If it is less than or equal
+    // to max, it means that a transaction with txid == max has not been found
+    if (numTxns == 0 && fromTxId <= max) { 
+      String error = String.format("Gap in transactions, max txnid is %d"
+                                   + ", 0 txns from %d", max, fromTxId);
+      LOG.error(error);
+      throw new CorruptionException(error);
+    }
+
+    return numTxns;
+  }
+
+  @Override
+  synchronized public void recoverUnfinalizedSegments() throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    
+    // make sure journal is aware of max seen transaction before moving corrupt 
+    // files aside
+    findMaxTransaction();
+
+    for (EditLogFile elf : allLogFiles) {
+      if (elf.getFile().equals(currentInProgress)) {
+        continue;
+      }
+      if (elf.isInProgress()) {
+        elf.validateLog();
+
+        if (elf.isCorrupt()) {
+          elf.moveAsideCorruptFile();
+          continue;
+        }
+        finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+  }
+
+  private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    List<EditLogFile> logFiles = Lists.newArrayList();
+    
+    for (EditLogFile elf : allLogFiles) {
+      if (fromTxId > elf.getFirstTxId()
+          && fromTxId <= elf.getLastTxId()) {
+        throw new IOException("Asked for fromTxId " + fromTxId
+            + " which is in middle of file " + elf.file);
+      }
+      if (fromTxId <= elf.getFirstTxId()) {
+        logFiles.add(elf);
+      }
+    }
+    
+    Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID);
+
+    return logFiles;
+  }
+
+  /** 
+   * Find the maximum transaction in the journal.
+   * This gets stored in a member variable, as corrupt edit logs
+   * will be moved aside, but we still need to remember their first
+   * tranaction id in the case that it was the maximum transaction in
+   * the journal.
+   */
+  private long findMaxTransaction()
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(0)) {
+      if (elf.isInProgress()) {
+        maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
+        elf.validateLog();
+      }
+      maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
+    }
+    return maxSeenTransaction;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FileJournalManager(root=%s)", sd.getRoot());
+  }
+
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
@@ -196,12 +333,10 @@ class FileJournalManager implements JournalManager {
     private File file;
     private final long firstTxId;
     private long lastTxId;
-    
-    private EditLogValidation cachedValidation = null;
+
     private boolean isCorrupt = false;
-    
-    static final long UNKNOWN_END = -1;
-    
+    private final boolean isInProgress;
+
     final static Comparator<EditLogFile> COMPARE_BY_START_TXID 
       = new Comparator<EditLogFile>() {
       public int compare(EditLogFile a, EditLogFile b) {
@@ -214,30 +349,24 @@ class FileJournalManager implements JournalManager {
 
     EditLogFile(File file,
         long firstTxId, long lastTxId) {
-      assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
-      assert firstTxId > 0;
+      this(file, firstTxId, lastTxId, false);
+      assert (lastTxId != HdfsConstants.INVALID_TXID)
+        && (lastTxId >= firstTxId);
+    }
+    
+    EditLogFile(File file, long firstTxId, 
+                long lastTxId, boolean isInProgress) { 
+      assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress)
+        || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId);
+      assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
+      this.isInProgress = isInProgress;
     }
     
-    public void finalizeLog() throws IOException {
-      long numTransactions = validateLog().numTransactions;
-      long lastTxId = firstTxId + numTransactions - 1;
-      File dst = new File(file.getParentFile(),
-          NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
-      LOG.info("Finalizing edits log " + file + " by renaming to "
-          + dst.getName());
-      if (!file.renameTo(dst)) {
-        throw new IOException("Couldn't finalize log " +
-            file + " to " + dst);
-      }
-      this.lastTxId = lastTxId;
-      file = dst;
-    }
-
     long getFirstTxId() {
       return firstTxId;
     }
@@ -246,15 +375,22 @@ class FileJournalManager implements JournalManager {
       return lastTxId;
     }
 
-    EditLogValidation validateLog() throws IOException {
-      if (cachedValidation == null) {
-        cachedValidation = EditLogFileInputStream.validateEditLog(file);
+    /** 
+     * Count the number of valid transactions in a log.
+     * This will update the lastTxId of the EditLogFile or
+     * mark it as corrupt if it is.
+     */
+    void validateLog() throws IOException {
+      EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+      if (val.getNumTransactions() == 0) {
+        markCorrupt();
+      } else {
+        this.lastTxId = val.getEndTxId();
       }
-      return cachedValidation;
     }
 
     boolean isInProgress() {
-      return (lastTxId == UNKNOWN_END);
+      return isInProgress;
     }
 
     File getFile() {

+ 36 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java

@@ -39,6 +39,25 @@ interface JournalManager {
    */
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
+   /**
+   * Get the input stream starting with fromTxnId from this journal manager
+   * @param fromTxnId the first transaction id we want to read
+   * @return the stream starting with transaction fromTxnId
+   * @throws IOException if a stream cannot be found.
+   */
+  EditLogInputStream getInputStream(long fromTxnId) throws IOException;
+
+  /**
+   * Get the number of transaction contiguously available from fromTxnId.
+   *
+   * @param fromTxnId Transaction id to count from
+   * @return The number of transactions available from fromTxnId
+   * @throws IOException if the journal cannot be read.
+   * @throws CorruptionException if there is a gap in the journal at fromTxnId.
+   */
+  long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException;
+
   /**
    * Set the amount of memory that this stream should use to buffer edits
    */
@@ -57,10 +76,21 @@ interface JournalManager {
     throws IOException;
 
   /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. May return null if this journal
-   * manager does not support this operation.
-   */  
-  EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-    throws IOException;
+   * Recover segments which have not been finalized.
+   */
+  void recoverUnfinalizedSegments() throws IOException;
+
+  /** 
+   * Indicate that a journal is cannot be used to load a certain range of 
+   * edits.
+   * This exception occurs in the case of a gap in the transactions, or a
+   * corrupt edit file.
+   */
+  public static class CorruptionException extends IOException {
+    static final long serialVersionUID = -4687802717006172702L;
+    
+    public CorruptionException(String reason) {
+      super(reason);
+    }
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java

@@ -248,7 +248,7 @@ public class TestDFSRollback extends TestCase {
       baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       deleteMatchingFiles(baseDirs, "edits.*");
       startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "but there are no logs to load");
+          "No non-corrupt logs for txid ");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with no image file", numDirs);

+ 3 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

@@ -392,12 +392,9 @@ public abstract class FSImageTestUtil {
    */
   public static EditLogFile findLatestEditsLog(StorageDirectory sd)
   throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-      new FSImageTransactionalStorageInspector();
-    inspector.inspectDirectory(sd);
-    
-    List<EditLogFile> foundEditLogs = Lists.newArrayList(
-        inspector.getEditLogFiles());
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> foundEditLogs 
+      = Lists.newArrayList(FileJournalManager.matchEditLogs(currentDir.listFiles()));
     return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID);
   }
 

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java

@@ -84,8 +84,10 @@ public class TestCheckPointForSecurityTokens {
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
+        log.validateLog();
+        long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should have 5 transactions",
-            5, log.validateLog().numTransactions);
+                     5, numTransactions);;
       }
 
       // Saving image in safe mode should succeed
@@ -99,8 +101,10 @@ public class TestCheckPointForSecurityTokens {
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
+        log.validateLog();
+        long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should only have START txn",
-            1, log.validateLog().numTransactions);
+            1, numTransactions);
       }
 
       // restart cluster

+ 209 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -23,6 +23,9 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -37,6 +40,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -54,6 +58,7 @@ import org.aspectj.util.FileUtil;
 import org.mockito.Mockito;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import static org.apache.hadoop.test.MetricsAsserts.*;
@@ -76,7 +81,7 @@ public class TestEditLog extends TestCase {
   static final int NUM_TRANSACTIONS = 100;
   static final int NUM_THREADS = 100;
   
-  private static final File TEST_DIR = new File(
+  static final File TEST_DIR = new File(
     System.getProperty("test.build.data","build/test/data"));
 
   /** An edits log with 3 edits from 0.20 - the result of
@@ -623,13 +628,23 @@ public class TestEditLog extends TestCase {
   }
   
   public void testCrashRecoveryEmptyLogOneDir() throws Exception {
-    doTestCrashRecoveryEmptyLog(false);
+    doTestCrashRecoveryEmptyLog(false, true);
   }
   
   public void testCrashRecoveryEmptyLogBothDirs() throws Exception {
-    doTestCrashRecoveryEmptyLog(true);
+    doTestCrashRecoveryEmptyLog(true, true);
+  }
+
+  public void testCrashRecoveryEmptyLogOneDirNoUpdateSeenTxId() 
+      throws Exception {
+    doTestCrashRecoveryEmptyLog(false, false);
   }
   
+  public void testCrashRecoveryEmptyLogBothDirsNoUpdateSeenTxId()
+      throws Exception {
+    doTestCrashRecoveryEmptyLog(true, false);
+  }
+
   /**
    * Test that the NN handles the corruption properly
    * after it crashes just after creating an edit log
@@ -642,8 +657,14 @@ public class TestEditLog extends TestCase {
    * will only be in one of the directories. In both cases, the
    * NN should fail to start up, because it's aware that txid 3
    * was reached, but unable to find a non-corrupt log starting there.
+   * @param updateTransactionIdFile if true update the seen_txid file.
+   * If false, the it will not be updated. This will simulate a case 
+   * where the NN crashed between creating the new segment and updating
+   * seen_txid. 
    */
-  private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
+  private void doTestCrashRecoveryEmptyLog(boolean inBothDirs, 
+                                           boolean updateTransactionIdFile) 
+      throws Exception {
     // start a cluster 
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
@@ -661,6 +682,14 @@ public class TestEditLog extends TestCase {
       // Make a truncated edits_3_inprogress
       File log = new File(currentDir,
           NNStorage.getInProgressEditsFileName(3));
+      NNStorage storage = new NNStorage(conf, 
+                                        Collections.<URI>emptyList(),
+                                        Lists.newArrayList(uri));
+      if (updateTransactionIdFile) {
+        storage.writeTransactionIdFileToStorage(3);
+      }
+      storage.close();
+
       new EditLogFileOutputStream(log, 1024).create();
       if (!inBothDirs) {
         break;
@@ -671,9 +700,9 @@ public class TestEditLog extends TestCase {
       cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(NUM_DATA_NODES).format(false).build();
       fail("Did not fail to start with all-corrupt logs");
-    } catch (IllegalStateException ise) {
+    } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
-          "No non-corrupt logs for txid 3", ise);
+          "No non-corrupt logs for txid 3", ioe);
     }
     cluster.shutdown();
   }
@@ -698,7 +727,17 @@ public class TestEditLog extends TestCase {
             
       reader = new FSEditLogOp.Reader(in, version);
     }
+  
+    @Override
+    public long getFirstTxId() throws IOException {
+      return HdfsConstants.INVALID_TXID;
+    }
     
+    @Override
+    public long getLastTxId() throws IOException {
+      return HdfsConstants.INVALID_TXID;
+    }
+  
     @Override
     public long length() throws IOException {
       return len;
@@ -848,6 +887,168 @@ public class TestEditLog extends TestCase {
     Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
     return storage;
   }
-  
-  
+
+  /** 
+   * Specification for a failure during #setupEdits
+   */
+  static class AbortSpec {
+    final int roll;
+    final int logindex;
+    
+    /**
+     * Construct the failure specification. 
+     * @param roll number to fail after. e.g. 1 to fail after the first roll
+     * @param loginfo index of journal to fail. 
+     */
+    AbortSpec(int roll, int logindex) {
+      this.roll = roll;
+      this.logindex = logindex;
+    }
+  }
+
+  final static int TXNS_PER_ROLL = 10;  
+  final static int TXNS_PER_FAIL = 2;
+    
+  /**
+   * Set up directories for tests. 
+   *
+   * Each rolled file is 10 txns long. 
+   * A failed file is 2 txns long.
+   * 
+   * @param editUris directories to create edit logs in
+   * @param numrolls number of times to roll the edit log during setup
+   * @param abortAtRolls Specifications for when to fail, see AbortSpec
+   */
+  public static NNStorage setupEdits(List<URI> editUris, int numrolls, 
+                                     AbortSpec... abortAtRolls)
+      throws IOException {
+    List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
+    NNStorage storage = new NNStorage(new Configuration(),
+                                      Collections.<URI>emptyList(),
+                                      editUris);
+    storage.format("test-cluster-id");
+    FSEditLog editlog = new FSEditLog(storage);    
+    // open the edit log and add two transactions
+    // logGenerationStamp is used, simply because it doesn't 
+    // require complex arguments.
+    editlog.open();
+    for (int i = 2; i < TXNS_PER_ROLL; i++) {
+      editlog.logGenerationStamp((long)0);
+    }
+    editlog.logSync();
+    
+    // Go into edit log rolling loop.
+    // On each roll, the abortAtRolls abort specs are 
+    // checked to see if an abort is required. If so the 
+    // the specified journal is aborted. It will be brought
+    // back into rotation automatically by rollEditLog
+    for (int i = 0; i < numrolls; i++) {
+      editlog.rollEditLog();
+      
+      editlog.logGenerationStamp((long)i);
+      editlog.logSync();
+
+      while (aborts.size() > 0 
+             && aborts.get(0).roll == (i+1)) {
+        AbortSpec spec = aborts.remove(0);
+        editlog.getJournals().get(spec.logindex).abort();
+      } 
+      
+      for (int j = 3; j < TXNS_PER_ROLL; j++) {
+        editlog.logGenerationStamp((long)i);
+      }
+      editlog.logSync();
+    }
+    editlog.close();
+
+    FSImageTestUtil.logStorageContents(LOG, storage);
+    return storage;
+  }
+
+  /** 
+   * Test loading an editlog which has had both its storage fail
+   * on alternating rolls. Two edit log directories are created.
+   * The first on fails on odd rolls, the second on even. Test
+   * that we are able to load the entire editlog regardless.
+   */
+  @Test
+  public void testAlternatingJournalFailure() throws IOException {
+    File f1 = new File(TEST_DIR + "/alternatingjournaltest0");
+    File f2 = new File(TEST_DIR + "/alternatingjournaltest1");
+
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
+    
+    NNStorage storage = setupEdits(editUris, 10,
+                                   new AbortSpec(1, 0),
+                                   new AbortSpec(2, 1),
+                                   new AbortSpec(3, 0),
+                                   new AbortSpec(4, 1),
+                                   new AbortSpec(5, 0),
+                                   new AbortSpec(6, 1),
+                                   new AbortSpec(7, 0),
+                                   new AbortSpec(8, 1),
+                                   new AbortSpec(9, 0),
+                                   new AbortSpec(10, 1));
+    long totaltxnread = 0;
+    FSEditLog editlog = new FSEditLog(storage);
+    long startTxId = 1;
+    Iterable<EditLogInputStream> editStreams = editlog.selectInputStreams(startTxId, 
+                                                                          TXNS_PER_ROLL*11);
+
+    for (EditLogInputStream edits : editStreams) {
+      FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
+      long read = val.getNumTransactions();
+      LOG.info("Loading edits " + edits + " read " + read);
+      assertEquals(startTxId, val.getStartTxId());
+      startTxId += read;
+      totaltxnread += read;
+    }
+
+    editlog.close();
+    storage.close();
+    assertEquals(TXNS_PER_ROLL*11, totaltxnread);    
+  }
+
+  /** 
+   * Test loading an editlog with gaps. A single editlog directory
+   * is set up. On of the edit log files is deleted. This should
+   * fail when selecting the input streams as it will not be able 
+   * to select enough streams to load up to 4*TXNS_PER_ROLL.
+   * There should be 4*TXNS_PER_ROLL transactions as we rolled 3
+   * times. 
+   */
+  @Test
+  public void testLoadingWithGaps() throws IOException {
+    File f1 = new File(TEST_DIR + "/gaptest0");
+    List<URI> editUris = ImmutableList.of(f1.toURI());
+
+    NNStorage storage = setupEdits(editUris, 3);
+    
+    final long startGapTxId = 1*TXNS_PER_ROLL + 1;
+    final long endGapTxId = 2*TXNS_PER_ROLL;
+
+    File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId, 
+                                  endGapTxId))) {
+            return true;
+          }
+          return false;
+        }
+      });
+    assertEquals(1, files.length);
+    assertTrue(files[0].delete());
+    
+    FSEditLog editlog = new FSEditLog(storage);
+    long startTxId = 1;
+    try {
+      Iterable<EditLogInputStream> editStreams 
+        = editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
+      
+      fail("Should have thrown exception");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "No non-corrupt logs for txid " + startGapTxId, ioe);
+    }
+  }
 }

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java

@@ -63,8 +63,8 @@ public class TestEditLogFileOutputStream {
 
     EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
     assertEquals("Edit log should contain a header as valid length",
-        HEADER_LEN, validation.validLength);
-    assertEquals(1, validation.numTransactions);
+        HEADER_LEN, validation.getValidLength());
+    assertEquals(1, validation.getNumTransactions());
     assertEquals("Edit log should have 1MB of bytes allocated",
         1024*1024, editLog.length());
     
@@ -72,12 +72,12 @@ public class TestEditLogFileOutputStream {
     cluster.getFileSystem().mkdirs(new Path("/tmp"),
         new FsPermission((short)777));
 
-    long oldLength = validation.validLength;
+    long oldLength = validation.getValidLength();
     validation = EditLogFileInputStream.validateEditLog(editLog);
     assertTrue("Edit log should have more valid data after writing a txn " +
-        "(was: " + oldLength + " now: " + validation.validLength + ")",
-        validation.validLength > oldLength);
-    assertEquals(2, validation.numTransactions);
+        "(was: " + oldLength + " now: " + validation.getValidLength() + ")",
+        validation.getValidLength() > oldLength);
+    assertEquals(2, validation.getNumTransactions());
 
     assertEquals("Edit log should be 1MB long",
         1024 * 1024, editLog.length());

+ 12 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -186,8 +186,8 @@ public class TestFSEditLogLoader {
     // Make sure that uncorrupted log has the expected length and number
     // of transactions.
     EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
-    assertEquals(NUM_TXNS + 2, validation.numTransactions);
-    assertEquals(validLength, validation.validLength);
+    assertEquals(NUM_TXNS + 2, validation.getNumTransactions());
+    assertEquals(validLength, validation.getValidLength());
     
     // Back up the uncorrupted log
     File logFileBak = new File(testDir, logFile.getName() + ".bak");
@@ -203,8 +203,8 @@ public class TestFSEditLogLoader {
       truncateFile(logFile, txOffset);
       validation = EditLogFileInputStream.validateEditLog(logFile);
       assertEquals("Failed when truncating to length " + txOffset,
-          txid - 1, validation.numTransactions);
-      assertEquals(txOffset, validation.validLength);
+          txid - 1, validation.getNumTransactions());
+      assertEquals(txOffset, validation.getValidLength());
 
       // Restore backup, truncate the file with one byte in the txn,
       // also isn't valid
@@ -212,24 +212,24 @@ public class TestFSEditLogLoader {
       truncateFile(logFile, txOffset + 1);
       validation = EditLogFileInputStream.validateEditLog(logFile);
       assertEquals("Failed when truncating to length " + (txOffset + 1),
-          txid - 1, validation.numTransactions);
-      assertEquals(txOffset, validation.validLength);
+          txid - 1, validation.getNumTransactions());
+      assertEquals(txOffset, validation.getValidLength());
 
       // Restore backup, corrupt the txn opcode
       Files.copy(logFileBak, logFile);
       corruptByteInFile(logFile, txOffset);
       validation = EditLogFileInputStream.validateEditLog(logFile);
       assertEquals("Failed when corrupting txn opcode at " + txOffset,
-          txid - 1, validation.numTransactions);
-      assertEquals(txOffset, validation.validLength);
+          txid - 1, validation.getNumTransactions());
+      assertEquals(txOffset, validation.getValidLength());
 
       // Restore backup, corrupt a byte a few bytes into the txn
       Files.copy(logFileBak, logFile);
       corruptByteInFile(logFile, txOffset+5);
       validation = EditLogFileInputStream.validateEditLog(logFile);
       assertEquals("Failed when corrupting txn data at " + (txOffset+5),
-          txid - 1, validation.numTransactions);
-      assertEquals(txOffset, validation.validLength);
+          txid - 1, validation.getNumTransactions());
+      assertEquals(txOffset, validation.getValidLength());
     }
     
     // Corrupt the log at every offset to make sure that validation itself
@@ -240,8 +240,8 @@ public class TestFSEditLogLoader {
       Files.copy(logFileBak, logFile);
       corruptByteInFile(logFile, offset);
       EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
-      assertTrue(val.numTransactions >= prevNumValid);
-      prevNumValid = val.numTransactions;
+      assertTrue(val.getNumTransactions() >= prevNumValid);
+      prevNumValid = val.getNumTransactions();
     }
   }
 

+ 2 - 326
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java

@@ -36,9 +36,6 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
 
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup;
-import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -63,335 +60,14 @@ public class TestFSImageStorageInspector {
         "/foo/current/" + getInProgressEditsFileName(457));
 
     inspector.inspectDirectory(mockDir);
-    mockLogValidation(inspector,
-        "/foo/current/" + getInProgressEditsFileName(457), 10);
-    
-    assertEquals(2, inspector.foundEditLogs.size());
     assertEquals(2, inspector.foundImages.size());
-    assertTrue(inspector.foundEditLogs.get(1).isInProgress());
-    
+
     FSImageFile latestImage = inspector.getLatestImage();
     assertEquals(456, latestImage.txId);
     assertSame(mockDir, latestImage.sd);
     assertTrue(inspector.isUpgradeFinalized());
     
-    LoadPlan plan = inspector.createLoadPlan();
-    LOG.info("Plan: " + plan);
-    
     assertEquals(new File("/foo/current/"+getImageFileName(456)), 
-                 plan.getImageFile());
-    assertArrayEquals(new File[] {
-        new File("/foo/current/" + getInProgressEditsFileName(457)) },
-        plan.getEditsFiles().toArray(new File[0]));
-  }
-  
-  /**
-   * Test that we check for gaps in txids when devising a load plan.
-   */
-  @Test
-  public void testPlanWithGaps() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    
-    StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.IMAGE_AND_EDITS,
-        false,
-        "/foo/current/" + getImageFileName(123),
-        "/foo/current/" + getImageFileName(456),
-        "/foo/current/" + getFinalizedEditsFileName(457,900),
-        "/foo/current/" + getFinalizedEditsFileName(901,950),
-        "/foo/current/" + getFinalizedEditsFileName(952,1000)); // <-- missing edit 951!
-
-    inspector.inspectDirectory(mockDir);
-    try {
-      inspector.createLoadPlan();
-      fail("Didn't throw IOE trying to load with gaps in edits");
-    } catch (IOException ioe) {
-      assertTrue(ioe.getMessage().contains(
-          "would start at txid 951 but starts at txid 952"));
-    }
-  }
-  
-  /**
-   * Test the case where an in-progress log comes in the middle of a sequence
-   * of logs
-   */
-  @Test
-  public void testPlanWithInProgressInMiddle() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    
-    StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.IMAGE_AND_EDITS,
-        false,
-        "/foo/current/" + getImageFileName(123),
-        "/foo/current/" + getImageFileName(456),
-        "/foo/current/" + getFinalizedEditsFileName(457,900),
-        "/foo/current/" + getInProgressEditsFileName(901), // <-- inprogress in middle
-        "/foo/current/" + getFinalizedEditsFileName(952,1000));
-
-    inspector.inspectDirectory(mockDir);
-    mockLogValidation(inspector,
-        "/foo/current/" + getInProgressEditsFileName(901), 51);
-
-    LoadPlan plan = inspector.createLoadPlan();
-    LOG.info("Plan: " + plan);
-    
-    assertEquals(new File("/foo/current/" + getImageFileName(456)), 
-                 plan.getImageFile());
-    assertArrayEquals(new File[] {
-        new File("/foo/current/" + getFinalizedEditsFileName(457,900)),
-        new File("/foo/current/" + getInProgressEditsFileName(901)),
-        new File("/foo/current/" + getFinalizedEditsFileName(952,1000)) },
-        plan.getEditsFiles().toArray(new File[0]));
-
-  }
-
-  
-  /**
-   * Test case for the usual case where no recovery of a log group is necessary
-   * (i.e all logs have the same start and end txids and finalized)
-   */
-  @Test
-  public void testLogGroupRecoveryNoop() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo1/current/" 
-                                  + getFinalizedEditsFileName(123,456)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo2/current/"
-                                  + getFinalizedEditsFileName(123,456)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo3/current/"
-                                  + getFinalizedEditsFileName(123,456)));
-    LogGroup lg = inspector.logGroups.get(123L);
-    assertEquals(3, lg.logs.size());
-    
-    lg.planRecovery();
-    
-    assertFalse(lg.logs.get(0).isCorrupt());
-    assertFalse(lg.logs.get(1).isCorrupt());
-    assertFalse(lg.logs.get(2).isCorrupt());
-  }
-  
-  /**
-   * Test case where we have some in-progress and some finalized logs
-   * for a given txid.
-   */
-  @Test
-  public void testLogGroupRecoveryMixed() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo1/current/"
-                                  + getFinalizedEditsFileName(123,456)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo2/current/"
-                                  + getFinalizedEditsFileName(123,456)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo3/current/"
-                                  + getInProgressEditsFileName(123)));
-    inspector.inspectDirectory(FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.IMAGE,
-        false,
-        "/foo4/current/" + getImageFileName(122)));
-
-    LogGroup lg = inspector.logGroups.get(123L);
-    assertEquals(3, lg.logs.size());
-    EditLogFile inProgressLog = lg.logs.get(2);
-    assertTrue(inProgressLog.isInProgress());
-    
-    LoadPlan plan = inspector.createLoadPlan();
-
-    // Check that it was marked corrupt.
-    assertFalse(lg.logs.get(0).isCorrupt());
-    assertFalse(lg.logs.get(1).isCorrupt());
-    assertTrue(lg.logs.get(2).isCorrupt());
-
-    
-    // Calling recover should move it aside
-    inProgressLog = spy(inProgressLog);
-    Mockito.doNothing().when(inProgressLog).moveAsideCorruptFile();
-    lg.logs.set(2, inProgressLog);
-    
-    plan.doRecovery();
-    
-    Mockito.verify(inProgressLog).moveAsideCorruptFile();
-  }
-  
-  /**
-   * Test case where we have finalized logs with different end txids
-   */
-  @Test
-  public void testLogGroupRecoveryInconsistentEndTxIds() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo1/current/"
-                                  + getFinalizedEditsFileName(123,456)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo2/current/"
-                                  + getFinalizedEditsFileName(123,678)));
-
-    LogGroup lg = inspector.logGroups.get(123L);
-    assertEquals(2, lg.logs.size());
-
-    try {
-      lg.planRecovery();
-      fail("Didn't throw IOE on inconsistent end txids");
-    } catch (IOException ioe) {
-      assertTrue(ioe.getMessage().contains("More than one ending txid"));
-    }
-  }
-
-  /**
-   * Test case where we have only in-progress logs and need to synchronize
-   * based on valid length.
-   */
-  @Test
-  public void testLogGroupRecoveryInProgress() throws IOException {
-    String paths[] = new String[] {
-        "/foo1/current/" + getInProgressEditsFileName(123),
-        "/foo2/current/" + getInProgressEditsFileName(123),
-        "/foo3/current/" + getInProgressEditsFileName(123)
-    };
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[0]));
-    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[1]));
-    inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[2]));
-
-    // Inject spies to return the valid counts we would like to see
-    mockLogValidation(inspector, paths[0], 2000);
-    mockLogValidation(inspector, paths[1], 2000);
-    mockLogValidation(inspector, paths[2], 1000);
-
-    LogGroup lg = inspector.logGroups.get(123L);
-    assertEquals(3, lg.logs.size());
-    
-    lg.planRecovery();
-    
-    // Check that the short one was marked corrupt
-    assertFalse(lg.logs.get(0).isCorrupt());
-    assertFalse(lg.logs.get(1).isCorrupt());
-    assertTrue(lg.logs.get(2).isCorrupt());
-    
-    // Calling recover should move it aside
-    EditLogFile badLog = lg.logs.get(2);
-    Mockito.doNothing().when(badLog).moveAsideCorruptFile();
-    Mockito.doNothing().when(lg.logs.get(0)).finalizeLog();
-    Mockito.doNothing().when(lg.logs.get(1)).finalizeLog();
-    
-    lg.recover();
-    
-    Mockito.verify(badLog).moveAsideCorruptFile();
-    Mockito.verify(lg.logs.get(0)).finalizeLog();
-    Mockito.verify(lg.logs.get(1)).finalizeLog();
-  }
-
-  /**
-   * Mock out the log at the given path to return a specified number
-   * of transactions upon validation.
-   */
-  private void mockLogValidation(
-      FSImageTransactionalStorageInspector inspector,
-      String path, int numValidTransactions) throws IOException {
-    
-    for (LogGroup lg : inspector.logGroups.values()) {
-      List<EditLogFile> logs = lg.logs;
-      for (int i = 0; i < logs.size(); i++) {
-        EditLogFile log = logs.get(i);
-        if (log.getFile().getPath().equals(path)) {
-          // mock out its validation
-          EditLogFile spyLog = spy(log);
-          doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions))
-            .when(spyLog).validateLog();
-          logs.set(i, spyLog);
-          return;
-        }
-      }
-    }
-    fail("No log found to mock out at " + path);
-  }
-
-  /**
-   * Test when edits and image are in separate directories.
-   */
-  @Test
-  public void testCurrentSplitEditsAndImage() throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    
-    StorageDirectory mockImageDir = FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.IMAGE,
-        false,
-        "/foo/current/" + getImageFileName(123));
-    StorageDirectory mockImageDir2 = FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.IMAGE,
-        false,
-        "/foo2/current/" + getImageFileName(456));
-    StorageDirectory mockEditsDir = FSImageTestUtil.mockStorageDirectory(
-        NameNodeDirType.EDITS,
-        false,
-        "/foo3/current/" + getFinalizedEditsFileName(123, 456),
-        "/foo3/current/" + getInProgressEditsFileName(457));
-    
-    inspector.inspectDirectory(mockImageDir);
-    inspector.inspectDirectory(mockEditsDir);
-    inspector.inspectDirectory(mockImageDir2);
-    
-    mockLogValidation(inspector,
-        "/foo3/current/" + getInProgressEditsFileName(457), 2);
-
-    assertEquals(2, inspector.foundEditLogs.size());
-    assertEquals(2, inspector.foundImages.size());
-    assertTrue(inspector.foundEditLogs.get(1).isInProgress());
-    assertTrue(inspector.isUpgradeFinalized());    
-
-    // Check plan
-    TransactionalLoadPlan plan =
-      (TransactionalLoadPlan)inspector.createLoadPlan();
-    FSImageFile pickedImage = plan.image;
-    assertEquals(456, pickedImage.txId);
-    assertSame(mockImageDir2, pickedImage.sd);
-    assertEquals(new File("/foo2/current/" + getImageFileName(456)),
-                 plan.getImageFile());
-    assertArrayEquals(new File[] {
-        new File("/foo3/current/" + getInProgressEditsFileName(457))
-      }, plan.getEditsFiles().toArray(new File[0]));
-  }
-  
-  /**
-   * Test case where an in-progress log is in an earlier name directory
-   * than a finalized log. Previously, getEditLogManifest wouldn't
-   * see this log.
-   */
-  @Test
-  public void testLogManifestInProgressComesFirst() throws IOException { 
-    FSImageTransactionalStorageInspector inspector =
-        new FSImageTransactionalStorageInspector();
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo1/current/" 
-                                  + getFinalizedEditsFileName(2622,2623),
-                                  "/foo1/current/"
-                                  + getFinalizedEditsFileName(2624,2625),
-                                  "/foo1/current/"
-                                  + getInProgressEditsFileName(2626)));
-    inspector.inspectDirectory(
-        mockDirectoryWithEditLogs("/foo2/current/"
-                                  + getFinalizedEditsFileName(2622,2623),
-                                  "/foo2/current/"
-                                  + getFinalizedEditsFileName(2624,2625),
-                                  "/foo2/current/"
-                                  + getFinalizedEditsFileName(2626,2627),
-                                  "/foo2/current/"
-                                  + getFinalizedEditsFileName(2628,2629)));
-  }  
-  
-  static StorageDirectory mockDirectoryWithEditLogs(String... fileNames) {
-    return FSImageTestUtil.mockStorageDirectory(NameNodeDirType.EDITS, false, fileNames);
+        latestImage.getFile());
   }
 }

+ 262 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java

@@ -19,17 +19,277 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.*;
 
-import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
 
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.SecurityUtil;
+import org.junit.Test;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Test;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL;
+import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.base.Joiner;
 
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
 public class TestFileJournalManager {
 
+  /** 
+   * Test the normal operation of loading transactions from
+   * file journal manager. 3 edits directories are setup without any
+   * failures. Test that we read in the expected number of transactions.
+   */
+  @Test
+  public void testNormalOperation() throws IOException {
+    File f1 = new File(TestEditLog.TEST_DIR + "/normtest0");
+    File f2 = new File(TestEditLog.TEST_DIR + "/normtest1");
+    File f3 = new File(TestEditLog.TEST_DIR + "/normtest2");
+    
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
+    NNStorage storage = setupEdits(editUris, 5);
+    
+    long numJournals = 0;
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+      FileJournalManager jm = new FileJournalManager(sd);
+      assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
+      numJournals++;
+    }
+    assertEquals(3, numJournals);
+  }
+
+  /**
+   * Test that inprogress files are handled correct. Set up a single
+   * edits directory. Fail on after the last roll. Then verify that the 
+   * logs have the expected number of transactions.
+   */
+  @Test
+  public void testInprogressRecovery() throws IOException {
+    File f = new File(TestEditLog.TEST_DIR + "/filejournaltest0");
+    // abort after the 5th roll 
+    NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
+                                   5, new AbortSpec(5, 0));
+    StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
+
+    FileJournalManager jm = new FileJournalManager(sd);
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, 
+                 jm.getNumberOfTransactions(1));
+  }
+
+  /**
+   * Test a mixture of inprogress files and finalised. Set up 3 edits 
+   * directories and fail the second on the last roll. Verify that reading
+   * the transactions, reads from the finalised directories.
+   */
+  @Test
+  public void testInprogressRecoveryMixed() throws IOException {
+    File f1 = new File(TestEditLog.TEST_DIR + "/mixtest0");
+    File f2 = new File(TestEditLog.TEST_DIR + "/mixtest1");
+    File f3 = new File(TestEditLog.TEST_DIR + "/mixtest2");
+    
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
+
+    // abort after the 5th roll 
+    NNStorage storage = setupEdits(editUris,
+                                   5, new AbortSpec(5, 1));
+    Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
+    StorageDirectory sd = dirs.next();
+    FileJournalManager jm = new FileJournalManager(sd);
+    assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
+    
+    sd = dirs.next();
+    jm = new FileJournalManager(sd);
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
+
+    sd = dirs.next();
+    jm = new FileJournalManager(sd);
+    assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
+  }
+
+  /** 
+   * Test that FileJournalManager behaves correctly despite inprogress
+   * files in all its edit log directories. Set up 3 directories and fail
+   * all on the last roll. Verify that the correct number of transaction 
+   * are then loaded.
+   */
+  @Test
+  public void testInprogressRecoveryAll() throws IOException {
+    File f1 = new File(TestEditLog.TEST_DIR + "/failalltest0");
+    File f2 = new File(TestEditLog.TEST_DIR + "/failalltest1");
+    File f3 = new File(TestEditLog.TEST_DIR + "/failalltest2");
+    
+    List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
+    // abort after the 5th roll 
+    NNStorage storage = setupEdits(editUris, 5, 
+                                   new AbortSpec(5, 0),
+                                   new AbortSpec(5, 1),
+                                   new AbortSpec(5, 2));
+    Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
+    StorageDirectory sd = dirs.next();
+    FileJournalManager jm = new FileJournalManager(sd);
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
+    
+    sd = dirs.next();
+    jm = new FileJournalManager(sd);
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
+
+    sd = dirs.next();
+    jm = new FileJournalManager(sd);
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
+  }
+
+  /** 
+   * Corrupt an edit log file after the start segment transaction
+   */
+  private void corruptAfterStartSegment(File f) throws IOException {
+    RandomAccessFile raf = new RandomAccessFile(f, "rw");
+    raf.seek(0x16); // skip version and first tranaction and a bit of next transaction
+    for (int i = 0; i < 1000; i++) {
+      raf.writeInt(0xdeadbeef);
+    }
+    raf.close();
+  }
+
+  /** 
+   * Test that we can read from a stream created by FileJournalManager.
+   * Create a single edits directory, failing it on the final roll.
+   * Then try loading from the point of the 3rd roll. Verify that we read 
+   * the correct number of transactions from this point.
+   */
+  @Test 
+  public void testReadFromStream() throws IOException {
+    File f = new File(TestEditLog.TEST_DIR + "/filejournaltest1");
+    // abort after 10th roll
+    NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
+                                   10, new AbortSpec(10, 0));
+    StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
+
+    FileJournalManager jm = new FileJournalManager(sd);
+    long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
+    assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1));
+
+    long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files
+    long startingTxId = skippedTxns + 1; 
+
+    long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId);
+    long numLoaded = 0;
+    while (numLoaded < numTransactionsToLoad) {
+      EditLogInputStream editIn = jm.getInputStream(startingTxId);
+      FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn);
+      long count = val.getNumTransactions();
+
+      editIn.close();
+      startingTxId += count;
+      numLoaded += count;
+    }
+
+    assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded); 
+  }
+
+  /**
+   * Try to make a request with a start transaction id which doesn't
+   * match the start ID of some log segment. 
+   * This should fail as edit logs must currently be treated as indevisable 
+   * units.
+   */
+  @Test(expected=IOException.class)
+  public void testAskForTransactionsMidfile() throws IOException {
+    File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
+    NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 
+                                   10);
+    StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
+    
+    FileJournalManager jm = new FileJournalManager(sd);
+    jm.getNumberOfTransactions(2);    
+  }
+
+  /** 
+   * Test that we receive the correct number of transactions when we count
+   * the number of transactions around gaps.
+   * Set up a single edits directory, with no failures. Delete the 4th logfile.
+   * Test that getNumberOfTransactions returns the correct number of 
+   * transactions before this gap and after this gap. Also verify that if you
+   * try to count on the gap that an exception is thrown.
+   */
+  @Test
+  public void testManyLogsWithGaps() throws IOException {
+    File f = new File(TestEditLog.TEST_DIR + "/filejournaltest3");
+    NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 10);
+    StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
+
+    final long startGapTxId = 3*TXNS_PER_ROLL + 1;
+    final long endGapTxId = 4*TXNS_PER_ROLL;
+    File[] files = new File(f, "current").listFiles(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId, endGapTxId))) {
+            return true;
+          }
+          return false;
+        }
+      });
+    assertEquals(1, files.length);
+    assertTrue(files[0].delete());
+    
+    FileJournalManager jm = new FileJournalManager(sd);
+    assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1));
+
+    try {
+      jm.getNumberOfTransactions(startGapTxId);
+      fail("Should have thrown an exception by now");
+    } catch (IOException ioe) {
+      assertTrue(true);
+    }
+
+    // rolled 10 times so there should be 11 files.
+    assertEquals(11*TXNS_PER_ROLL - endGapTxId, 
+                 jm.getNumberOfTransactions(endGapTxId+1));
+  }
+
+  /** 
+   * Test that we can load an edits directory with a corrupt inprogress file.
+   * The corrupt inprogress file should be moved to the side.
+   */
+  @Test
+  public void testManyLogsWithCorruptInprogress() throws IOException {
+    File f = new File(TestEditLog.TEST_DIR + "/filejournaltest5");
+    NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 10, new AbortSpec(10, 0));
+    StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
+
+    File[] files = new File(f, "current").listFiles(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          if (name.startsWith("edits_inprogress")) {
+            return true;
+          }
+          return false;
+        }
+      });
+    assertEquals(files.length, 1);
+    
+    corruptAfterStartSegment(files[0]);
+
+    FileJournalManager jm = new FileJournalManager(sd);
+    assertEquals(10*TXNS_PER_ROLL+1, 
+                 jm.getNumberOfTransactions(1)); 
+  }
+
   @Test
   public void testGetRemoteEditLog() throws IOException {
     StorageDirectory sd = FSImageTestUtil.mockStorageDirectory(
@@ -58,5 +318,4 @@ public class TestFileJournalManager {
       FileJournalManager fjm, long firstTxId) throws IOException {
     return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));
   }
-  
 }

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.Random;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -80,10 +81,12 @@ public class TestNameEditsConfigs extends TestCase {
       assertTrue("Expect no images in " + dir, ins.foundImages.isEmpty());      
     }
 
+    List<FileJournalManager.EditLogFile> editlogs 
+      = FileJournalManager.matchEditLogs(new File(dir, "current").listFiles()); 
     if (shouldHaveEdits) {
-      assertTrue("Expect edits in " + dir, ins.foundEditLogs.size() > 0);
+      assertTrue("Expect edits in " + dir, editlogs.size() > 0);
     } else {
-      assertTrue("Expect no edits in " + dir, ins.foundEditLogs.isEmpty());
+      assertTrue("Expect no edits in " + dir, editlogs.isEmpty());
     }
   }