浏览代码

HDFS-2982. Startup performance suffers when there are many edit log segments. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1342043 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 年之前
父节点
当前提交
e4b5b99e8e
共有 25 个文件被更改,包括 657 次插入558 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
  3. 27 5
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  6. 3 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
  7. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
  8. 113 58
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  9. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
  10. 73 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  11. 36 57
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  12. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  13. 59 136
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  14. 9 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
  15. 56 58
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
  19. 62 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  20. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
  21. 80 103
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
  22. 100 41
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
  23. 3 12
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
  24. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
  25. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -59,6 +59,9 @@ Release 2.0.1-alpha - UNRELEASED
 
   OPTIMIZATIONS
 
+    HDFS-2982. Startup performance suffers when there are many edit log
+    segments. (Colin Patrick McCabe via todd)
+
   BUG FIXES
 
     HDFS-3385. The last block of INodeFileUnderConstruction is not

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java

@@ -79,12 +79,12 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
   }
 
   @Override
-  public long getFirstTxId() throws IOException {
+  public long getFirstTxId() {
     return firstTxId;
   }
 
   @Override
-  public long getLastTxId() throws IOException {
+  public long getLastTxId() {
     return lastTxId;
   }
   

+ 27 - 5
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java

@@ -37,6 +37,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.ArrayList;
 import java.util.List;
@@ -313,8 +314,7 @@ public class BookKeeperJournalManager implements JournalManager {
   }
 
   // TODO(HA): Handle inProgressOk
-  @Override
-  public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
+  EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
       throws IOException {
     for (EditLogLedgerMetadata l : getLedgerList()) {
       if (l.getFirstTxId() == fromTxnId) {
@@ -328,12 +328,34 @@ public class BookKeeperJournalManager implements JournalManager {
         }
       }
     }
-    throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
+    return null;
   }
 
-  // TODO(HA): Handle inProgressOk
   @Override
-  public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) {
+    // NOTE: could probably be rewritten more efficiently
+    while (true) {
+      EditLogInputStream elis;
+      try {
+        elis = getInputStream(fromTxId, inProgressOk);
+      } catch (IOException e) {
+        LOG.error(e);
+        return;
+      }
+      if (elis == null) {
+        return;
+      }
+      streams.add(elis);
+      if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
+        return;
+      }
+      fromTxId = elis.getLastTxId() + 1;
+    }
+  }
+
+  // TODO(HA): Handle inProgressOk
+  long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
       throws IOException {
     long count = 0;
     long expectedStart = 0;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java

@@ -34,6 +34,6 @@ public class FSEditLogTestUtil {
   public static long countTransactionsInStream(EditLogInputStream in) 
       throws IOException {
     FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
-    return validation.getNumTransactions();
+    return (validation.getEndTxId() - in.getFirstTxId()) + 1;
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -220,7 +220,7 @@ public class BackupImage extends FSImage {
       int logVersion = storage.getLayoutVersion();
       backupInputStream.setBytes(data, logVersion);
 
-      long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, 
+      long numTxnsAdvanced = logLoader.loadEditRecords(
           backupInputStream, true, lastAppliedTxId + 1, null);
       if (numTxnsAdvanced != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +

+ 3 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -60,19 +61,10 @@ class BackupJournalManager implements JournalManager {
   }
 
   @Override
-  public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
-      throws IOException, CorruptionException {
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) {
     // This JournalManager is never used for input. Therefore it cannot
     // return any transactions
-    return 0;
-  }
-  
-  @Override
-  public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
-      throws IOException {
-    // This JournalManager is never used for input. Therefore it cannot
-    // return any transactions
-    throw new IOException("Unsupported operation");
   }
 
   @Override

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -129,12 +129,12 @@ class EditLogBackupInputStream extends EditLogInputStream {
   }
 
   @Override
-  public long getFirstTxId() throws IOException {
+  public long getFirstTxId() {
     return HdfsConstants.INVALID_TXID;
   }
 
   @Override
-  public long getLastTxId() throws IOException {
+  public long getLastTxId() {
     return HdfsConstants.INVALID_TXID;
   }
 

+ 113 - 58
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -24,12 +24,16 @@ import java.io.IOException;
 import java.io.BufferedInputStream;
 import java.io.EOFException;
 import java.io.DataInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 
 /**
  * An implementation of the abstract class {@link EditLogInputStream}, which
@@ -37,13 +41,21 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class EditLogFileInputStream extends EditLogInputStream {
   private final File file;
-  private final FileInputStream fStream;
-  final private long firstTxId;
-  final private long lastTxId;
-  private final int logVersion;
-  private final FSEditLogOp.Reader reader;
-  private final FSEditLogLoader.PositionTrackingInputStream tracker;
+  private final long firstTxId;
+  private final long lastTxId;
   private final boolean isInProgress;
+  static private enum State {
+    UNINIT,
+    OPEN,
+    CLOSED
+  }
+  private State state = State.UNINIT;
+  private FileInputStream fStream = null;
+  private int logVersion = 0;
+  private FSEditLogOp.Reader reader = null;
+  private FSEditLogLoader.PositionTrackingInputStream tracker = null;
+  private DataInputStream dataIn = null;
+  static final Log LOG = LogFactory.getLog(EditLogInputStream.class);
   
   /**
    * Open an EditLogInputStream for the given file.
@@ -70,34 +82,43 @@ public class EditLogFileInputStream extends EditLogInputStream {
    *         header
    */
   public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
-      boolean isInProgress)
-      throws LogHeaderCorruptException, IOException {
-    file = name;
-    fStream = new FileInputStream(name);
-
-    BufferedInputStream bin = new BufferedInputStream(fStream);
-    tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
-    DataInputStream in = new DataInputStream(tracker);
-
-    try {
-      logVersion = readLogVersion(in);
-    } catch (EOFException eofe) {
-      throw new LogHeaderCorruptException("No header found in log");
-    }
-
-    reader = new FSEditLogOp.Reader(in, tracker, logVersion);
+      boolean isInProgress) {
+    this.file = name;
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
     this.isInProgress = isInProgress;
   }
 
+  private void init() throws LogHeaderCorruptException, IOException {
+    Preconditions.checkState(state == State.UNINIT);
+    BufferedInputStream bin = null;
+    try {
+      fStream = new FileInputStream(file);
+      bin = new BufferedInputStream(fStream);
+      tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+      dataIn = new DataInputStream(tracker);
+      try {
+        logVersion = readLogVersion(dataIn);
+      } catch (EOFException eofe) {
+        throw new LogHeaderCorruptException("No header found in log");
+      }
+      reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      state = State.OPEN;
+    } finally {
+      if (reader == null) {
+        IOUtils.cleanup(LOG, dataIn, tracker, bin, fStream);
+        state = State.CLOSED;
+      }
+    }
+  }
+
   @Override
-  public long getFirstTxId() throws IOException {
+  public long getFirstTxId() {
     return firstTxId;
   }
   
   @Override
-  public long getLastTxId() throws IOException {
+  public long getLastTxId() {
     return lastTxId;
   }
 
@@ -106,61 +127,95 @@ public class EditLogFileInputStream extends EditLogInputStream {
     return file.getPath();
   }
 
-  @Override
-  protected FSEditLogOp nextOp() throws IOException {
-    FSEditLogOp op = reader.readOp(false);
-    if ((op != null) && (op.hasTransactionId())) {
-      long txId = op.getTransactionId();
-      if ((txId >= lastTxId) &&
-          (lastTxId != HdfsConstants.INVALID_TXID)) {
-        //
-        // Sometimes, the NameNode crashes while it's writing to the
-        // edit log.  In that case, you can end up with an unfinalized edit log
-        // which has some garbage at the end.
-        // JournalManager#recoverUnfinalizedSegments will finalize these
-        // unfinished edit logs, giving them a defined final transaction 
-        // ID.  Then they will be renamed, so that any subsequent
-        // readers will have this information.
-        //
-        // Since there may be garbage at the end of these "cleaned up"
-        // logs, we want to be sure to skip it here if we've read everything
-        // we were supposed to read out of the stream.
-        // So we force an EOF on all subsequent reads.
-        //
-        long skipAmt = file.length() - tracker.getPos();
-        if (skipAmt > 0) {
-          FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " +
+  private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
+    FSEditLogOp op = null;
+    switch (state) {
+    case UNINIT:
+      try {
+        init();
+      } catch (Throwable e) {
+        LOG.error("caught exception initializing " + this, e);
+        if (skipBrokenEdits) {
+          return null;
+        }
+        Throwables.propagateIfPossible(e, IOException.class);
+      }
+      Preconditions.checkState(state != State.UNINIT);
+      return nextOpImpl(skipBrokenEdits);
+    case OPEN:
+      op = reader.readOp(skipBrokenEdits);
+      if ((op != null) && (op.hasTransactionId())) {
+        long txId = op.getTransactionId();
+        if ((txId >= lastTxId) &&
+            (lastTxId != HdfsConstants.INVALID_TXID)) {
+          //
+          // Sometimes, the NameNode crashes while it's writing to the
+          // edit log.  In that case, you can end up with an unfinalized edit log
+          // which has some garbage at the end.
+          // JournalManager#recoverUnfinalizedSegments will finalize these
+          // unfinished edit logs, giving them a defined final transaction 
+          // ID.  Then they will be renamed, so that any subsequent
+          // readers will have this information.
+          //
+          // Since there may be garbage at the end of these "cleaned up"
+          // logs, we want to be sure to skip it here if we've read everything
+          // we were supposed to read out of the stream.
+          // So we force an EOF on all subsequent reads.
+          //
+          long skipAmt = file.length() - tracker.getPos();
+          if (skipAmt > 0) {
+            LOG.warn("skipping " + skipAmt + " bytes at the end " +
               "of edit log  '" + getName() + "': reached txid " + txId +
               " out of " + lastTxId);
-          tracker.skip(skipAmt);
+            tracker.skip(skipAmt);
+          }
         }
       }
+      break;
+      case CLOSED:
+        break; // return null
     }
     return op;
   }
-  
+
+  @Override
+  protected FSEditLogOp nextOp() throws IOException {
+    return nextOpImpl(false);
+  }
+
   @Override
   protected FSEditLogOp nextValidOp() {
     try {
-      return reader.readOp(true);
-    } catch (IOException e) {
+      return nextOpImpl(true);
+    } catch (Throwable e) {
+      LOG.error("nextValidOp: got exception while reading " + this, e);
       return null;
     }
   }
 
   @Override
   public int getVersion() throws IOException {
+    if (state == State.UNINIT) {
+      init();
+    }
     return logVersion;
   }
 
   @Override
   public long getPosition() {
-    return tracker.getPos();
+    if (state == State.OPEN) {
+      return tracker.getPos();
+    } else {
+      return 0;
+    }
   }
 
   @Override
   public void close() throws IOException {
-    fStream.close();
+    if (state == State.OPEN) {
+      dataIn.close();
+    }
+    state = State.CLOSED;
   }
 
   @Override
@@ -183,12 +238,12 @@ public class EditLogFileInputStream extends EditLogInputStream {
     EditLogFileInputStream in;
     try {
       in = new EditLogFileInputStream(file);
-    } catch (LogHeaderCorruptException corrupt) {
+      in.getVersion(); // causes us to read the header
+    } catch (LogHeaderCorruptException e) {
       // If the header is malformed or the wrong value, this indicates a corruption
-      FSImage.LOG.warn("Log at " + file + " has no valid header",
-          corrupt);
+      LOG.warn("Log file " + file + " has no valid header", e);
       return new FSEditLogLoader.EditLogValidation(0,
-          HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true);
+          HdfsConstants.INVALID_TXID, true);
     }
     
     try {

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java

@@ -45,12 +45,12 @@ public abstract class EditLogInputStream implements Closeable {
   /** 
    * @return the first transaction which will be found in this stream
    */
-  public abstract long getFirstTxId() throws IOException;
+  public abstract long getFirstTxId();
   
   /** 
    * @return the last transaction which will be found in this stream
    */
-  public abstract long getLastTxId() throws IOException;
+  public abstract long getLastTxId();
 
 
   /**
@@ -73,14 +73,14 @@ public abstract class EditLogInputStream implements Closeable {
     }
     return nextOp();
   }
-
+  
   /** 
    * Position the stream so that a valid operation can be read from it with
    * readOp().
    * 
    * This method can be used to skip over corrupted sections of edit logs.
    */
-  public void resync() throws IOException {
+  public void resync() {
     if (cachedOp != null) {
       return;
     }
@@ -109,7 +109,7 @@ public abstract class EditLogInputStream implements Closeable {
     // error recovery will want to override this.
     try {
       return nextOp();
-    } catch (IOException e) {
+    } catch (Throwable e) {
       return null;
     }
   }

+ 73 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -22,6 +22,7 @@ import java.net.URI;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.lang.reflect.Constructor;
 
@@ -246,13 +247,14 @@ public class FSEditLog  {
     long segmentTxId = getLastWrittenTxId() + 1;
     // Safety check: we should never start a segment if there are
     // newer txids readable.
-    EditLogInputStream s = journalSet.getInputStream(segmentTxId, true);
-    try {
-      Preconditions.checkState(s == null,
-          "Cannot start writing at txid %s when there is a stream " +
-          "available for read: %s", segmentTxId, s);
-    } finally {
-      IOUtils.closeStream(s);
+    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+    journalSet.selectInputStreams(streams, segmentTxId, true);
+    if (!streams.isEmpty()) {
+      String error = String.format("Cannot start writing at txid %s " +
+        "when there is a stream available for read: %s",
+        segmentTxId, streams.get(0));
+      IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0]));
+      throw new IllegalStateException(error);
     }
     
     startLogSegment(segmentTxId, true);
@@ -1071,10 +1073,10 @@ public class FSEditLog  {
       // All journals have failed, it is handled in logSync.
     }
   }
-  
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId) throws IOException {
-    return selectInputStreams(fromTxId, toAtLeastTxId, true);
+
+  public Collection<EditLogInputStream> selectInputStreams(
+      long fromTxId, long toAtLeastTxId) throws IOException {
+    return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
   }
 
   /**
@@ -1084,25 +1086,71 @@ public class FSEditLog  {
    * @param toAtLeast the selected streams must contain this transaction
    * @param inProgessOk set to true if in-progress streams are OK
    */
-  public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId, boolean inProgressOk) throws IOException {
+  public synchronized Collection<EditLogInputStream> selectInputStreams(
+      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
+      boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
-    while (stream != null) {
-      streams.add(stream);
-      // We're now looking for a higher range, so reset the fromTxId
-      fromTxId = stream.getLastTxId() + 1;
-      stream = journalSet.getInputStream(fromTxId, inProgressOk);
+    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+
+    try {
+      checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
+    } catch (IOException e) {
+      if (recovery != null) {
+        // If recovery mode is enabled, continue loading even if we know we
+        // can't load up to toAtLeastTxId.
+        LOG.error(e);
+      } else {
+        closeAllStreams(streams);
+        throw e;
+      }
     }
-    
-    if (fromTxId <= toAtLeastTxId) {
-      closeAllStreams(streams);
-      throw new IOException(String.format("Gap in transactions. Expected to "
-          + "be able to read up until at least txid %d but unable to find any "
-          + "edit logs containing txid %d", toAtLeastTxId, fromTxId));
+    // This code will go away as soon as RedundantEditLogInputStream is
+    // introduced. (HDFS-3049)
+    try {
+      if (!streams.isEmpty()) {
+        streams.get(0).skipUntil(fromTxId);
+      }
+    } catch (IOException e) {
+      // We don't want to throw an exception from here, because that would make
+      // recovery impossible even if the user requested it.  An exception will
+      // be thrown later, when we don't read the starting txid we expect.
+      LOG.error("error skipping until transaction " + fromTxId, e);
     }
     return streams;
   }
+  
+  /**
+   * Check for gaps in the edit log input stream list.
+   * Note: we're assuming that the list is sorted and that txid ranges don't
+   * overlap.  This could be done better and with more generality with an
+   * interval tree.
+   */
+  private void checkForGaps(List<EditLogInputStream> streams, long fromTxId,
+      long toAtLeastTxId, boolean inProgressOk) throws IOException {
+    Iterator<EditLogInputStream> iter = streams.iterator();
+    long txId = fromTxId;
+    while (true) {
+      if (txId > toAtLeastTxId) return;
+      if (!iter.hasNext()) break;
+      EditLogInputStream elis = iter.next();
+      if (elis.getFirstTxId() > txId) break;
+      long next = elis.getLastTxId();
+      if (next == HdfsConstants.INVALID_TXID) {
+        if (!inProgressOk) {
+          throw new RuntimeException("inProgressOk = false, but " +
+              "selectInputStreams returned an in-progress edit " +
+              "log input stream (" + elis + ")");
+        }
+        // We don't know where the in-progress stream ends.
+        // It could certainly go all the way up to toAtLeastTxId.
+        return;
+      }
+      txId = next + 1;
+    }
+    throw new IOException(String.format("Gap in transactions. Expected to "
+        + "be able to read up until at least txid %d but unable to find any "
+        + "edit logs containing txid %d", toAtLeastTxId, txId));
+  }
 
   /** 
    * Close all the streams in a collection

+ 36 - 57
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -87,12 +87,10 @@ public class FSEditLogLoader {
    */
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
       MetaRecoveryContext recovery) throws IOException {
-    int logVersion = edits.getVersion();
-
     fsNamesys.writeLock();
     try {
       long startTime = now();
-      long numEdits = loadEditRecords(logVersion, edits, false, 
+      long numEdits = loadEditRecords(edits, false, 
                                  expectedStartingTxId, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
@@ -104,7 +102,7 @@ public class FSEditLogLoader {
     }
   }
 
-  long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
+  long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
                       long expectedStartingTxId, MetaRecoveryContext recovery)
       throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
@@ -143,7 +141,7 @@ public class FSEditLogLoader {
             }
           } catch (Throwable e) {
             // Handle a problem with our input
-            check203UpgradeFailure(logVersion, e);
+            check203UpgradeFailure(in.getVersion(), e);
             String errorMessage =
               formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
             FSImage.LOG.error(errorMessage, e);
@@ -160,7 +158,7 @@ public class FSEditLogLoader {
           }
           recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
             in.getPosition();
-          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+          if (op.hasTransactionId()) {
             if (op.getTransactionId() > expectedTxId) { 
               MetaRecoveryContext.editLogLoaderPrompt("There appears " +
                   "to be a gap in the edit log.  We expected txid " +
@@ -177,7 +175,7 @@ public class FSEditLogLoader {
             }
           }
           try {
-            applyEditLogOp(op, fsDir, logVersion);
+            applyEditLogOp(op, fsDir, in.getVersion());
           } catch (Throwable e) {
             LOG.error("Encountered exception on operation " + op, e);
             MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
@@ -194,7 +192,7 @@ public class FSEditLogLoader {
             expectedTxId = lastAppliedTxId = expectedStartingTxId;
           }
           // log progress
-          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+          if (op.hasTransactionId()) {
             long now = now();
             if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
               int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
@@ -649,76 +647,57 @@ public class FSEditLogLoader {
   }
   
   /**
-   * Return the number of valid transactions in the stream. If the stream is
-   * truncated during the header, returns a value indicating that there are
-   * 0 valid transactions. This reads through the stream but does not close
-   * it.
+   * Find the last valid transaction ID in the stream.
+   * If there are invalid or corrupt transactions in the middle of the stream,
+   * validateEditLog will skip over them.
+   * This reads through the stream but does not close it.
+   *
    * @throws IOException if the stream cannot be read due to an IO error (eg
    *                     if the log does not exist)
    */
   static EditLogValidation validateEditLog(EditLogInputStream in) {
     long lastPos = 0;
-    long firstTxId = HdfsConstants.INVALID_TXID;
     long lastTxId = HdfsConstants.INVALID_TXID;
     long numValid = 0;
-    try {
-      FSEditLogOp op = null;
-      while (true) {
-        lastPos = in.getPosition();
+    FSEditLogOp op = null;
+    while (true) {
+      lastPos = in.getPosition();
+      try {
         if ((op = in.readOp()) == null) {
           break;
         }
-        if (firstTxId == HdfsConstants.INVALID_TXID) {
-          firstTxId = op.getTransactionId();
-        }
-        if (lastTxId == HdfsConstants.INVALID_TXID
-            || op.getTransactionId() == lastTxId + 1) {
-          lastTxId = op.getTransactionId();
-        } else {
-          FSImage.LOG.error("Out of order txid found. Found " +
-            op.getTransactionId() + ", expected " + (lastTxId + 1));
-          break;
-        }
-        numValid++;
+      } catch (Throwable t) {
+        FSImage.LOG.warn("Caught exception after reading " + numValid +
+            " ops from " + in + " while determining its valid length." +
+            "Position was " + lastPos, t);
+        break;
       }
-    } catch (Throwable t) {
-      // Catch Throwable and not just IOE, since bad edits may generate
-      // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
-      FSImage.LOG.debug("Caught exception after reading " + numValid +
-          " ops from " + in + " while determining its valid length.", t);
+      if (lastTxId == HdfsConstants.INVALID_TXID
+          || op.getTransactionId() > lastTxId) {
+        lastTxId = op.getTransactionId();
+      }
+      numValid++;
     }
-    return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
+    return new EditLogValidation(lastPos, lastTxId, false);
   }
-  
+
   static class EditLogValidation {
     private final long validLength;
-    private final long startTxId;
     private final long endTxId;
-    private final boolean corruptionDetected;
-     
-    EditLogValidation(long validLength, long startTxId, long endTxId,
-        boolean corruptionDetected) {
+    private final boolean hasCorruptHeader;
+
+    EditLogValidation(long validLength, long endTxId,
+        boolean hasCorruptHeader) {
       this.validLength = validLength;
-      this.startTxId = startTxId;
       this.endTxId = endTxId;
-      this.corruptionDetected = corruptionDetected;
+      this.hasCorruptHeader = hasCorruptHeader;
     }
-    
+
     long getValidLength() { return validLength; }
-    
-    long getStartTxId() { return startTxId; }
-    
+
     long getEndTxId() { return endTxId; }
-    
-    long getNumTransactions() { 
-      if (endTxId == HdfsConstants.INVALID_TXID
-          || startTxId == HdfsConstants.INVALID_TXID) {
-        return 0;
-      }
-      return (endTxId - startTxId) + 1;
-    }
-    
-    boolean hasCorruptHeader() { return corruptionDetected; }
+
+    boolean hasCorruptHeader() { return hasCorruptHeader; }
   }
 
   /**

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -559,7 +559,7 @@ public class FSImage implements Closeable {
 
   /**
    * Choose latest image from one of the directories,
-   * load it and merge with the edits from that directory.
+   * load it and merge with the edits.
    * 
    * Saving and loading fsimage should never trigger symlink resolution. 
    * The paths that are persisted do not have *intermediate* symlinks 
@@ -595,7 +595,7 @@ public class FSImage implements Closeable {
       // OK to not be able to read all of edits right now.
       long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
       editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
-          toAtLeastTxId, false);
+          toAtLeastTxId, recovery, false);
     } else {
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
@@ -603,7 +603,10 @@ public class FSImage implements Closeable {
  
     LOG.debug("Planning to load image :\n" + imageFile);
     for (EditLogInputStream l : editStreams) {
-      LOG.debug("\t Planning to load edit stream: " + l);
+      LOG.debug("Planning to load edit log stream: " + l);
+    }
+    if (!editStreams.iterator().hasNext()) {
+      LOG.info("No edit log streams selected.");
     }
     
     try {

+ 59 - 136
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Comparator;
 import java.util.Collections;
@@ -73,7 +74,7 @@ class FileJournalManager implements JournalManager {
 
   @Override 
   public void close() throws IOException {}
-
+  
   @Override
   synchronized public EditLogOutputStream startLogSegment(long txid) 
       throws IOException {
@@ -212,90 +213,46 @@ class FileJournalManager implements JournalManager {
   }
 
   @Override
-  synchronized public EditLogInputStream getInputStream(long fromTxId,
-      boolean inProgressOk) throws IOException {
-    for (EditLogFile elf : getLogFiles(fromTxId)) {
-      if (elf.containsTxId(fromTxId)) {
-        if (!inProgressOk && elf.isInProgress()) {
-          continue;
-        }
-        if (elf.isInProgress()) {
-          elf.validateLog();
-        }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Returning edit stream reading from " + elf);
-        }
-        EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
-            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
-        long transactionsToSkip = fromTxId - elf.getFirstTxId();
-        if (transactionsToSkip > 0) {
-          LOG.info(String.format("Log begins at txid %d, but requested start "
-              + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
-              transactionsToSkip));
-        }
-        if (elfis.skipUntil(fromTxId) == false) {
-          throw new IOException("failed to advance input stream to txid " +
-              fromTxId);
-        }
-        return elfis;
-      }
+  synchronized public void selectInputStreams(
+      Collection<EditLogInputStream> streams, long fromTxId,
+      boolean inProgressOk) {
+    List<EditLogFile> elfs;
+    try {
+      elfs = matchEditLogs(sd.getCurrentDir());
+    } catch (IOException e) {
+      LOG.error("error listing files in " + this + ". " +
+          "Skipping all edit logs in this directory.", e);
+      return;
     }
-
-    throw new IOException("Cannot find editlog file containing " + fromTxId);
-  }
-
-  @Override
-  public long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
-      throws IOException, CorruptionException {
-    long numTxns = 0L;
-    
-    for (EditLogFile elf : getLogFiles(fromTxId)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Counting " + elf);
+    LOG.debug(this + ": selecting input streams starting at " + fromTxId + 
+        (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
+        "from among " + elfs.size() + " candidate file(s)");
+    for (EditLogFile elf : elfs) {
+      if (elf.lastTxId < fromTxId) {
+        LOG.debug("passing over " + elf + " because it ends at " +
+            elf.lastTxId + ", but we only care about transactions " +
+            "as new as " + fromTxId);
+        continue;
       }
-      if (elf.getFirstTxId() > fromTxId) { // there must be a gap
-        LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
-            + fromTxId + " - " + (elf.getFirstTxId() - 1));
-        break;
-      } else if (elf.containsTxId(fromTxId)) {
-        if (!inProgressOk && elf.isInProgress()) {
-          break;
+      if (elf.isInProgress()) {
+        if (!inProgressOk) {
+          LOG.debug("passing over " + elf + " because it is in progress " +
+              "and we are ignoring in-progress logs.");
+          continue;
         }
-        
-        if (elf.isInProgress()) {
+        try {
           elf.validateLog();
-        } 
-
-        if (elf.hasCorruptHeader()) {
-          break;
-        }
-        numTxns += elf.getLastTxId() + 1 - fromTxId;
-        fromTxId = elf.getLastTxId() + 1;
-        
-        if (elf.isInProgress()) {
-          break;
+        } catch (IOException e) {
+          LOG.error("got IOException while trying to validate header of " +
+              elf + ".  Skipping.", e);
+          continue;
         }
       }
+      EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
+            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
+      LOG.debug("selecting edit log stream " + elf);
+      streams.add(elfis);
     }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Journal " + this + " has " + numTxns 
-                + " txns from " + fromTxId);
-    }
-
-    long max = findMaxTransaction(inProgressOk);
-    
-    // fromTxId should be greater than max, as it points to the next 
-    // transaction we should expect to find. If it is less than or equal
-    // to max, it means that a transaction with txid == max has not been found
-    if (numTxns == 0 && fromTxId <= max) { 
-      String error = String.format("Gap in transactions, max txnid is %d"
-                                   + ", 0 txns from %d", max, fromTxId);
-      LOG.error(error);
-      throw new CorruptionException(error);
-    }
-
-    return numTxns;
   }
 
   @Override
@@ -318,7 +275,7 @@ class FileJournalManager implements JournalManager {
           }
           continue;
         }
-        
+
         elf.validateLog();
 
         if (elf.hasCorruptHeader()) {
@@ -326,19 +283,16 @@ class FileJournalManager implements JournalManager {
           throw new CorruptionException("In-progress edit log file is corrupt: "
               + elf);
         }
-        
-        // If the file has a valid header (isn't corrupt) but contains no
-        // transactions, we likely just crashed after opening the file and
-        // writing the header, but before syncing any transactions. Safe to
-        // delete the file.
-        if (elf.getNumTransactions() == 0) {
-          LOG.info("Deleting edit log file with zero transactions " + elf);
-          if (!elf.getFile().delete()) {
-            throw new IOException("Unable to delete " + elf.getFile());
-          }
+        if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
+          // If the file has a valid header (isn't corrupt) but contains no
+          // transactions, we likely just crashed after opening the file and
+          // writing the header, but before syncing any transactions. Safe to
+          // delete the file.
+          LOG.info("Moving aside edit log file that seems to have zero " +
+              "transactions " + elf);
+          elf.moveAsideEmptyFile();
           continue;
         }
-        
         finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
       }
     }
@@ -361,39 +315,6 @@ class FileJournalManager implements JournalManager {
     return logFiles;
   }
 
-  /** 
-   * Find the maximum transaction in the journal.
-   */
-  private long findMaxTransaction(boolean inProgressOk)
-      throws IOException {
-    boolean considerSeenTxId = true;
-    long seenTxId = NNStorage.readTransactionIdFile(sd);
-    long maxSeenTransaction = 0;
-    for (EditLogFile elf : getLogFiles(0)) {
-      if (elf.isInProgress() && !inProgressOk) {
-        if (elf.getFirstTxId() != HdfsConstants.INVALID_TXID &&
-            elf.getFirstTxId() <= seenTxId) {
-          // don't look at the seen_txid file if in-progress logs are not to be
-          // examined, and the value in seen_txid falls within the in-progress
-          // segment.
-          considerSeenTxId = false;
-        }
-        continue;
-      }
-      
-      if (elf.isInProgress()) {
-        maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
-        elf.validateLog();
-      }
-      maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
-    }
-    if (considerSeenTxId) {
-      return Math.max(maxSeenTransaction, seenTxId);
-    } else {
-      return maxSeenTransaction;
-    }
-  }
-
   @Override
   public String toString() {
     return String.format("FileJournalManager(root=%s)", sd.getRoot());
@@ -406,7 +327,6 @@ class FileJournalManager implements JournalManager {
     private File file;
     private final long firstTxId;
     private long lastTxId;
-    private long numTx = -1;
 
     private boolean hasCorruptHeader = false;
     private final boolean isInProgress;
@@ -454,20 +374,15 @@ class FileJournalManager implements JournalManager {
     }
 
     /** 
-     * Count the number of valid transactions in a log.
+     * Find out where the edit log ends.
      * This will update the lastTxId of the EditLogFile or
      * mark it as corrupt if it is.
      */
     void validateLog() throws IOException {
       EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
-      this.numTx = val.getNumTransactions();
       this.lastTxId = val.getEndTxId();
       this.hasCorruptHeader = val.hasCorruptHeader();
     }
-    
-    long getNumTransactions() {
-      return numTx;
-    }
 
     boolean isInProgress() {
       return isInProgress;
@@ -483,23 +398,31 @@ class FileJournalManager implements JournalManager {
 
     void moveAsideCorruptFile() throws IOException {
       assert hasCorruptHeader;
-    
+      renameSelf(".corrupt");
+    }
+
+    void moveAsideEmptyFile() throws IOException {
+      assert lastTxId == HdfsConstants.INVALID_TXID;
+      renameSelf(".empty");
+    }
+      
+    private void renameSelf(String newSuffix) throws IOException {
       File src = file;
-      File dst = new File(src.getParent(), src.getName() + ".corrupt");
+      File dst = new File(src.getParent(), src.getName() + newSuffix);
       boolean success = src.renameTo(dst);
       if (!success) {
         throw new IOException(
-          "Couldn't rename corrupt log " + src + " to " + dst);
+          "Couldn't rename log " + src + " to " + dst);
       }
       file = dst;
     }
-    
+
     @Override
     public String toString() {
       return String.format("EditLogFile(file=%s,first=%019d,last=%019d,"
-                           +"inProgress=%b,hasCorruptHeader=%b,numTx=%d)",
+                           +"inProgress=%b,hasCorruptHeader=%b)",
                            file.toString(), firstTxId, lastTxId,
-                           isInProgress(), hasCorruptHeader, numTx);
+                           isInProgress(), hasCorruptHeader);
     }
   }
 }

+ 9 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -46,26 +47,17 @@ public interface JournalManager extends Closeable {
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
    /**
-   * Get the input stream starting with fromTxnId from this journal manager
+   * Get a list of edit log input streams.  The list will start with the
+   * stream that contains fromTxnId, and continue until the end of the journal
+   * being managed.
+   * 
    * @param fromTxnId the first transaction id we want to read
    * @param inProgressOk whether or not in-progress streams should be returned
-   * @return the stream starting with transaction fromTxnId
-   * @throws IOException if a stream cannot be found.
-   */
-  EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
-    throws IOException;
-
-  /**
-   * Get the number of transaction contiguously available from fromTxnId.
    *
-   * @param fromTxnId Transaction id to count from
-   * @param inProgressOk whether or not in-progress streams should be counted
-   * @return The number of transactions available from fromTxnId
-   * @throws IOException if the journal cannot be read.
-   * @throws CorruptionException if there is a gap in the journal at fromTxnId.
+   * @return a list of streams
    */
-  long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
-      throws IOException, CorruptionException;
+  void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk);
 
   /**
    * Set the amount of memory that this stream should use to buffer edits
@@ -92,7 +84,7 @@ public interface JournalManager extends Closeable {
    * Close the journal manager, freeing any resources it may hold.
    */
   void close() throws IOException;
-
+  
   /** 
    * Indicate that a journal is cannot be used to load a certain range of 
    * edits.

+ 56 - 58
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java

@@ -19,7 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.SortedSet;
 
@@ -31,11 +34,13 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultiset;
 
 /**
  * Manages a collection of Journals. None of the methods are synchronized, it is
@@ -46,6 +51,17 @@ public class JournalSet implements JournalManager {
 
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
   
+  static final public Comparator<EditLogInputStream>
+    EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
+      @Override
+      public int compare(EditLogInputStream a, EditLogInputStream b) {
+        return ComparisonChain.start().
+          compare(a.getFirstTxId(), b.getFirstTxId()).
+          compare(b.getLastTxId(), a.getLastTxId()).
+          result();
+      }
+    };
+  
   /**
    * Container for a JournalManager paired with its currently
    * active stream.
@@ -193,75 +209,57 @@ public class JournalSet implements JournalManager {
     }, "close journal");
   }
 
-  
   /**
-   * Find the best editlog input stream to read from txid.
-   * If a journal throws an CorruptionException while reading from a txn id,
-   * it means that it has more transactions, but can't find any from fromTxId. 
-   * If this is the case and no other journal has transactions, we should throw
-   * an exception as it means more transactions exist, we just can't load them.
-   *
-   * @param fromTxnId Transaction id to start from.
-   * @return A edit log input stream with tranactions fromTxId 
-   *         or null if no more exist
+   * In this function, we get a bunch of streams from all of our JournalManager
+   * objects.  Then we add these to the collection one by one.
+   * 
+   * @param streams          The collection to add the streams to.  It may or 
+   *                         may not be sorted-- this is up to the caller.
+   * @param fromTxId         The transaction ID to start looking for streams at
+   * @param inProgressOk     Should we consider unfinalized streams?
    */
   @Override
-  public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
-      throws IOException {
-    JournalManager bestjm = null;
-    long bestjmNumTxns = 0;
-    CorruptionException corruption = null;
-
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) {
+    final TreeMultiset<EditLogInputStream> allStreams =
+        TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (JournalAndStream jas : journals) {
-      if (jas.isDisabled()) continue;
-      
-      JournalManager candidate = jas.getManager();
-      long candidateNumTxns = 0;
-      try {
-        candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId,
-            inProgressOk);
-      } catch (CorruptionException ce) {
-        corruption = ce;
-      } catch (IOException ioe) {
-        LOG.warn("Unable to read input streams from JournalManager " + candidate,
-            ioe);
-        continue; // error reading disk, just skip
-      }
-      
-      if (candidateNumTxns > bestjmNumTxns) {
-        bestjm = candidate;
-        bestjmNumTxns = candidateNumTxns;
-      }
-    }
-    
-    if (bestjm == null) {
-      if (corruption != null) {
-        throw new IOException("No non-corrupt logs for txid " 
-                                        + fromTxnId, corruption);
-      } else {
-        return null;
-      }
-    }
-    return bestjm.getInputStream(fromTxnId, inProgressOk);
-  }
-  
-  @Override
-  public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
-      throws IOException {
-    long num = 0;
-    for (JournalAndStream jas: journals) {
       if (jas.isDisabled()) {
         LOG.info("Skipping jas " + jas + " since it's disabled");
         continue;
+      }
+      jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+    }
+    // We want to group together all the streams that start on the same start
+    // transaction ID.  To do this, we maintain an accumulator (acc) of all
+    // the streams we've seen at a given start transaction ID.  When we see a
+    // higher start transaction ID, we select a stream from the accumulator and
+    // clear it.  Then we begin accumulating streams with the new, higher start
+    // transaction ID.
+    LinkedList<EditLogInputStream> acc =
+        new LinkedList<EditLogInputStream>();
+    for (EditLogInputStream elis : allStreams) {
+      if (acc.isEmpty()) {
+        acc.add(elis);
       } else {
-        long newNum = jas.getManager().getNumberOfTransactions(fromTxnId,
-            inProgressOk);
-        if (newNum > num) {
-          num = newNum;
+        long accFirstTxId = acc.get(0).getFirstTxId();
+        if (accFirstTxId == elis.getFirstTxId()) {
+          acc.add(elis);
+        } else if (accFirstTxId < elis.getFirstTxId()) {
+          streams.add(acc.get(0));
+          acc.clear();
+          acc.add(elis);
+        } else if (accFirstTxId > elis.getFirstTxId()) {
+          throw new RuntimeException("sorted set invariants violated!  " +
+              "Got stream with first txid " + elis.getFirstTxId() +
+              ", but the last firstTxId was " + accFirstTxId);
         }
       }
     }
-    return num;
+    if (!acc.isEmpty()) {
+      streams.add(acc.get(0));
+      acc.clear();
+    }
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java

@@ -225,7 +225,7 @@ public class BootstrapStandby implements Tool, Configurable {
     try {
       Collection<EditLogInputStream> streams =
         image.getEditLog().selectInputStreams(
-          firstTxIdInLogs, curTxIdOnOtherNode, true);
+          firstTxIdInLogs, curTxIdOnOtherNode, null, true);
       for (EditLogInputStream stream : streams) {
         IOUtils.closeStream(stream);
       }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

@@ -201,7 +201,7 @@ public class EditLogTailer {
       }
       Collection<EditLogInputStream> streams;
       try {
-        streams = editLog.selectInputStreams(lastTxnId + 1, 0, false);
+        streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
       } catch (IOException ioe) {
         // This is acceptable. If we try to tail edits in the middle of an edits
         // log roll, i.e. the last one has been finalized but the new inprogress

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java

@@ -248,7 +248,7 @@ public class TestDFSRollback extends TestCase {
       baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       deleteMatchingFiles(baseDirs, "edits.*");
       startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "No non-corrupt logs for txid ");
+          "Gap in transactions");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with no image file", numDirs);

+ 62 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -22,6 +22,7 @@ import java.io.*;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -740,8 +741,9 @@ public class TestEditLog extends TestCase {
         throw ioe;
       } else {
         GenericTestUtils.assertExceptionContains(
-            "No non-corrupt logs for txid 3",
-            ioe);
+          "Gap in transactions. Expected to be able to read up until " +
+          "at least txid 3 but unable to find any edit logs containing " +
+          "txid 3", ioe);
       }
     } finally {
       cluster.shutdown();
@@ -770,12 +772,12 @@ public class TestEditLog extends TestCase {
     }
   
     @Override
-    public long getFirstTxId() throws IOException {
+    public long getFirstTxId() {
       return HdfsConstants.INVALID_TXID;
     }
     
     @Override
-    public long getLastTxId() throws IOException {
+    public long getLastTxId() {
       return HdfsConstants.INVALID_TXID;
     }
   
@@ -1104,9 +1106,9 @@ public class TestEditLog extends TestCase {
 
     for (EditLogInputStream edits : editStreams) {
       FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
-      long read = val.getNumTransactions();
+      long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
       LOG.info("Loading edits " + edits + " read " + read);
-      assertEquals(startTxId, val.getStartTxId());
+      assertEquals(startTxId, edits.getFirstTxId());
       startTxId += read;
       totaltxnread += read;
     }
@@ -1154,7 +1156,9 @@ public class TestEditLog extends TestCase {
       fail("Should have thrown exception");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
-          "No non-corrupt logs for txid " + startGapTxId, ioe);
+          "Gap in transactions. Expected to be able to read up until " +
+          "at least txid 40 but unable to find any edit logs containing " +
+          "txid 11", ioe);
     }
   }
 
@@ -1228,4 +1232,55 @@ public class TestEditLog extends TestCase {
       validateNoCrash(garbage);
     }
   }
+
+  /**
+   * Test creating a directory with lots and lots of edit log segments
+   */
+  @Test
+  public void testManyEditLogSegments() throws IOException {
+    final int NUM_EDIT_LOG_ROLLS = 1000;
+    // start a cluster
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      FSImage fsimage = namesystem.getFSImage();
+      final FSEditLog editLog = fsimage.getEditLog();
+      for (int i = 0; i < NUM_EDIT_LOG_ROLLS; i++){
+        editLog.logSetReplication("fakefile" + i, (short)(i % 3));
+        assertExistsInStorageDirs(
+            cluster, NameNodeDirType.EDITS,
+            NNStorage.getInProgressEditsFileName((i * 3) + 1));
+        editLog.logSync();
+        editLog.rollEditLog();
+        assertExistsInStorageDirs(
+            cluster, NameNodeDirType.EDITS,
+            NNStorage.getFinalizedEditsFileName((i * 3) + 1, (i * 3) + 3));
+      }
+      editLog.close();
+    } finally {
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+
+    // How long does it take to read through all these edit logs?
+    long startTime = System.currentTimeMillis();
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).
+          numDataNodes(NUM_DATA_NODES).build();
+      cluster.waitActive();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+    long endTime = System.currentTimeMillis();
+    double delta = ((float)(endTime - startTime)) / 1000.0;
+    LOG.info(String.format("loaded %d edit log segments in %.2f seconds",
+        NUM_EDIT_LOG_ROLLS, delta));
+  }
 }

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java

@@ -40,8 +40,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestEditLogFileOutputStream {
-  
-  private final static long PREALLOCATION_LENGTH = (1024 * 1024) + 4;
   private final static int HEADER_LEN = 17;
   private static final File TEST_EDITS =
     new File(System.getProperty("test.build.data","/tmp"),
@@ -51,21 +49,22 @@ public class TestEditLogFileOutputStream {
   public void deleteEditsFile() {
     TEST_EDITS.delete();
   }
-  
+
   @Test
   public void testPreallocation() throws IOException {
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
         .build();
 
+    final long START_TXID = 1;
     StorageDirectory sd = cluster.getNameNode().getFSImage()
       .getStorage().getStorageDir(0);
-    File editLog = NNStorage.getInProgressEditsFile(sd, 1);
+    File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID);
 
     EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
     assertEquals("Edit log should contain a header as valid length",
         HEADER_LEN, validation.getValidLength());
-    assertEquals(1, validation.getNumTransactions());
+    assertEquals(validation.getEndTxId(), START_TXID);
     assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
         "for the version number",
         EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
@@ -79,7 +78,7 @@ public class TestEditLogFileOutputStream {
     assertTrue("Edit log should have more valid data after writing a txn " +
         "(was: " + oldLength + " now: " + validation.getValidLength() + ")",
         validation.getValidLength() > oldLength);
-    assertEquals(2, validation.getNumTransactions());
+    assertEquals(1, validation.getEndTxId() - START_TXID);
 
     assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
         EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());

+ 80 - 103
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -40,16 +41,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
 public class TestFSEditLogLoader {
   
   static {
@@ -153,108 +161,6 @@ public class TestFSEditLogLoader {
     }
   }
   
-  /**
-   * Test that the valid number of transactions can be counted from a file.
-   * @throws IOException 
-   */
-  @Test
-  public void testCountValidTransactions() throws IOException {
-    File testDir = new File(TEST_DIR, "testCountValidTransactions");
-    File logFile = new File(testDir,
-        NNStorage.getInProgressEditsFileName(1));
-    
-    // Create a log file, and return the offsets at which each
-    // transaction starts.
-    FSEditLog fsel = null;
-    final int NUM_TXNS = 30;
-    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
-    try {
-      fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
-      fsel.openForWrite();
-      assertTrue("should exist: " + logFile, logFile.exists());
-      
-      for (int i = 0; i < NUM_TXNS; i++) {
-        long trueOffset = getNonTrailerLength(logFile);
-        long thisTxId = fsel.getLastWrittenTxId() + 1;
-        offsetToTxId.put(trueOffset, thisTxId);
-        System.err.println("txid " + thisTxId + " at offset " + trueOffset);
-        fsel.logDelete("path" + i, i);
-        fsel.logSync();
-      }
-    } finally {
-      if (fsel != null) {
-        fsel.close();
-      }
-    }
-
-    // The file got renamed when the log was closed.
-    logFile = testDir.listFiles()[0];
-    long validLength = getNonTrailerLength(logFile);
-
-    // Make sure that uncorrupted log has the expected length and number
-    // of transactions.
-    EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
-    assertEquals(NUM_TXNS + 2, validation.getNumTransactions());
-    assertEquals(validLength, validation.getValidLength());
-    
-    // Back up the uncorrupted log
-    File logFileBak = new File(testDir, logFile.getName() + ".bak");
-    Files.copy(logFile, logFileBak);
-
-    // Corrupt the log file in various ways for each txn
-    for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
-      long txOffset = entry.getKey();
-      long txid = entry.getValue();
-      
-      // Restore backup, truncate the file exactly before the txn
-      Files.copy(logFileBak, logFile);
-      truncateFile(logFile, txOffset);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
-      assertEquals("Failed when truncating to length " + txOffset,
-          txid - 1, validation.getNumTransactions());
-      assertEquals(txOffset, validation.getValidLength());
-
-      // Restore backup, truncate the file with one byte in the txn,
-      // also isn't valid
-      Files.copy(logFileBak, logFile);
-      truncateFile(logFile, txOffset + 1);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
-      assertEquals("Failed when truncating to length " + (txOffset + 1),
-          txid - 1, validation.getNumTransactions());
-      assertEquals(txOffset, validation.getValidLength());
-
-      // Restore backup, corrupt the txn opcode
-      Files.copy(logFileBak, logFile);
-      corruptByteInFile(logFile, txOffset);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
-      assertEquals("Failed when corrupting txn opcode at " + txOffset,
-          txid - 1, validation.getNumTransactions());
-      assertEquals(txOffset, validation.getValidLength());
-
-      // Restore backup, corrupt a byte a few bytes into the txn
-      Files.copy(logFileBak, logFile);
-      corruptByteInFile(logFile, txOffset+5);
-      validation = EditLogFileInputStream.validateEditLog(logFile);
-      assertEquals("Failed when corrupting txn data at " + (txOffset+5),
-          txid - 1, validation.getNumTransactions());
-      assertEquals(txOffset, validation.getValidLength());
-    }
-    
-    // Corrupt the log at every offset to make sure that validation itself
-    // never throws an exception, and that the calculated lengths are monotonically
-    // increasing
-    long prevNumValid = 0;
-    for (long offset = 0; offset < validLength; offset++) {
-      Files.copy(logFileBak, logFile);
-      corruptByteInFile(logFile, offset);
-      EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
-      assertTrue(String.format("%d should have been >= %d",
-          val.getNumTransactions(), prevNumValid),
-          val.getNumTransactions() >= prevNumValid);
-      prevNumValid = val.getNumTransactions();
-    }
-  }
-
   /**
    * Corrupt the byte at the given offset in the given file,
    * by subtracting 1 from it.
@@ -318,7 +224,7 @@ public class TestFSEditLogLoader {
       fis.close();
     }
   }
-  
+
   @Test
   public void testStreamLimiter() throws IOException {
     final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test");
@@ -361,4 +267,75 @@ public class TestFSEditLogLoader {
       tracker.close();
     }
   }
+
+  /**
+   * Create an unfinalized edit log for testing purposes
+   *
+   * @param testDir           Directory to create the edit log in
+   * @param numTx             Number of transactions to add to the new edit log
+   * @param offsetToTxId      A map from transaction IDs to offsets in the 
+   *                          edit log file.
+   * @return                  The new edit log file name.
+   * @throws IOException
+   */
+  static private File prepareUnfinalizedTestEditLog(File testDir, int numTx,
+      SortedMap<Long, Long> offsetToTxId) throws IOException {
+    File inProgressFile = new File(testDir, NNStorage.getInProgressEditsFileName(1));
+    FSEditLog fsel = null, spyLog = null;
+    try {
+      fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
+      spyLog = spy(fsel);
+      // Normally, the in-progress edit log would be finalized by
+      // FSEditLog#endCurrentLogSegment.  For testing purposes, we
+      // disable that here.
+      doNothing().when(spyLog).endCurrentLogSegment(true);
+      spyLog.openForWrite();
+      assertTrue("should exist: " + inProgressFile, inProgressFile.exists());
+      
+      for (int i = 0; i < numTx; i++) {
+        long trueOffset = getNonTrailerLength(inProgressFile);
+        long thisTxId = spyLog.getLastWrittenTxId() + 1;
+        offsetToTxId.put(trueOffset, thisTxId);
+        System.err.println("txid " + thisTxId + " at offset " + trueOffset);
+        spyLog.logDelete("path" + i, i);
+        spyLog.logSync();
+      }
+    } finally {
+      if (spyLog != null) {
+        spyLog.close();
+      } else if (fsel != null) {
+        fsel.close();
+      }
+    }
+    return inProgressFile;
+  }
+
+  @Test
+  public void testValidateEditLogWithCorruptHeader() throws IOException {
+    File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptHeader");
+    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+    File logFile = prepareUnfinalizedTestEditLog(testDir, 2, offsetToTxId);
+    RandomAccessFile rwf = new RandomAccessFile(logFile, "rw");
+    try {
+      rwf.seek(0);
+      rwf.writeLong(42); // corrupt header
+    } finally {
+      rwf.close();
+    }
+    EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
+    assertTrue(validation.hasCorruptHeader());
+  }
+
+  @Test
+  public void testValidateEmptyEditLog() throws IOException {
+    File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");
+    SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
+    File logFile = prepareUnfinalizedTestEditLog(testDir, 0, offsetToTxId);
+    // Truncate the file so that there is nothing except the header
+    truncateFile(logFile, 4);
+    EditLogValidation validation =
+        EditLogFileInputStream.validateEditLog(logFile);
+    assertTrue(!validation.hasCorruptHeader());
+    assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId());
+  }
 }

+ 100 - 41
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.junit.Assert.*;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Arrays;
 import java.util.List;
@@ -37,10 +38,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.junit.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
 import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
@@ -48,12 +53,54 @@ import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL;
 import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.TreeMultiset;
 import com.google.common.base.Joiner;
 
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 public class TestFileJournalManager {
+  static final Log LOG = LogFactory.getLog(TestFileJournalManager.class);
+
+  /**
+   * Find out how many transactions we can read from a
+   * FileJournalManager, starting at a given transaction ID.
+   * 
+   * @param jm              The journal manager
+   * @param fromTxId        Transaction ID to start at
+   * @param inProgressOk    Should we consider edit logs that are not finalized?
+   * @return                The number of transactions
+   * @throws IOException
+   */
+  static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
+      boolean inProgressOk, boolean abortOnGap) throws IOException {
+    long numTransactions = 0, txId = fromTxId;
+    final TreeMultiset<EditLogInputStream> allStreams =
+        TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
+
+    try {
+      for (EditLogInputStream elis : allStreams) {
+        elis.skipUntil(txId);
+        while (true) {
+          FSEditLogOp op = elis.readOp();
+          if (op == null) {
+            break;
+          }
+          if (abortOnGap && (op.getTransactionId() != txId)) {
+            LOG.info("getNumberOfTransactions: detected gap at txId " +
+                fromTxId);
+            return numTransactions;
+          }
+          txId = op.getTransactionId() + 1;
+          numTransactions++;
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
+    }
+    return numTransactions;
+  }
 
   /** 
    * Test the normal operation of loading transactions from
@@ -72,7 +119,7 @@ public class TestFileJournalManager {
     long numJournals = 0;
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
       FileJournalManager jm = new FileJournalManager(sd, storage);
-      assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
+      assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
       numJournals++;
     }
     assertEquals(3, numJournals);
@@ -93,7 +140,7 @@ public class TestFileJournalManager {
 
     FileJournalManager jm = new FileJournalManager(sd, storage);
     assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, 
-                 jm.getNumberOfTransactions(1, true));
+                 getNumberOfTransactions(jm, 1, true, false));
   }
 
   /**
@@ -115,16 +162,16 @@ public class TestFileJournalManager {
     Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
     StorageDirectory sd = dirs.next();
     FileJournalManager jm = new FileJournalManager(sd, storage);
-    assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
+    assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
     
     sd = dirs.next();
     jm = new FileJournalManager(sd, storage);
-    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
-        true));
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
+        true, false));
 
     sd = dirs.next();
     jm = new FileJournalManager(sd, storage);
-    assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
+    assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
   }
 
   /** 
@@ -148,18 +195,18 @@ public class TestFileJournalManager {
     Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
     StorageDirectory sd = dirs.next();
     FileJournalManager jm = new FileJournalManager(sd, storage);
-    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
-        true));
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
+        true, false));
     
     sd = dirs.next();
     jm = new FileJournalManager(sd, storage);
-    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
-        true));
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
+        true, false));
 
     sd = dirs.next();
     jm = new FileJournalManager(sd, storage);
-    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
-        true));
+    assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
+        true, false));
   }
 
   /** 
@@ -209,24 +256,15 @@ public class TestFileJournalManager {
 
     FileJournalManager jm = new FileJournalManager(sd, storage);
     long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
-    assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true));
+    assertEquals(expectedTotalTxnCount, getNumberOfTransactions(jm, 1,
+        true, false));
 
     long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files
     long startingTxId = skippedTxns + 1; 
 
-    long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId, true);
-    long numLoaded = 0;
-    while (numLoaded < numTransactionsToLoad) {
-      EditLogInputStream editIn = jm.getInputStream(startingTxId, true);
-      FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn);
-      long count = val.getNumTransactions();
-
-      editIn.close();
-      startingTxId += count;
-      numLoaded += count;
-    }
-
-    assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded); 
+    long numLoadable = getNumberOfTransactions(jm, startingTxId,
+        true, false);
+    assertEquals(expectedTotalTxnCount - skippedTxns, numLoadable); 
   }
 
   /**
@@ -247,8 +285,8 @@ public class TestFileJournalManager {
     // 10 rolls, so 11 rolled files, 110 txids total.
     final int TOTAL_TXIDS = 10 * 11;
     for (int txid = 1; txid <= TOTAL_TXIDS; txid++) {
-      assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid,
-          true));
+      assertEquals((TOTAL_TXIDS - txid) + 1, getNumberOfTransactions(jm, txid,
+          true, false));
     }
   }
 
@@ -280,19 +318,13 @@ public class TestFileJournalManager {
     assertTrue(files[0].delete());
     
     FileJournalManager jm = new FileJournalManager(sd, storage);
-    assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true));
+    assertEquals(startGapTxId-1, getNumberOfTransactions(jm, 1, true, true));
 
-    try {
-      jm.getNumberOfTransactions(startGapTxId, true);
-      fail("Should have thrown an exception by now");
-    } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains(
-          "Gap in transactions, max txnid is 110, 0 txns from 31", ioe);
-    }
+    assertEquals(0, getNumberOfTransactions(jm, startGapTxId, true, true));
 
     // rolled 10 times so there should be 11 files.
     assertEquals(11*TXNS_PER_ROLL - endGapTxId, 
-                 jm.getNumberOfTransactions(endGapTxId + 1, true));
+                 getNumberOfTransactions(jm, endGapTxId + 1, true, true));
   }
 
   /** 
@@ -319,7 +351,7 @@ public class TestFileJournalManager {
 
     FileJournalManager jm = new FileJournalManager(sd, storage);
     assertEquals(10*TXNS_PER_ROLL+1, 
-                 jm.getNumberOfTransactions(1, true));
+                 getNumberOfTransactions(jm, 1, true, false));
   }
 
   @Test
@@ -356,6 +388,33 @@ public class TestFileJournalManager {
     FileJournalManager.matchEditLogs(badDir);
   }
   
+  private static EditLogInputStream getJournalInputStream(JournalManager jm,
+      long txId, boolean inProgressOk) throws IOException {
+    final TreeMultiset<EditLogInputStream> allStreams =
+        TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    jm.selectInputStreams(allStreams, txId, inProgressOk);
+    try {
+      for (Iterator<EditLogInputStream> iter = allStreams.iterator();
+          iter.hasNext();) {
+        EditLogInputStream elis = iter.next();
+        if (elis.getFirstTxId() > txId) {
+          break;
+        }
+        if (elis.getLastTxId() < txId) {
+          iter.remove();
+          elis.close();
+          continue;
+        }
+        elis.skipUntil(txId);
+        iter.remove();
+        return elis;
+      }
+    } finally {
+      IOUtils.cleanup(LOG,  allStreams.toArray(new EditLogInputStream[0]));
+    }
+    return null;
+  }
+    
   /**
    * Make sure that we starting reading the correct op when we request a stream
    * with a txid in the middle of an edit log file.
@@ -370,7 +429,7 @@ public class TestFileJournalManager {
     
     FileJournalManager jm = new FileJournalManager(sd, storage);
     
-    EditLogInputStream elis = jm.getInputStream(5, true);
+    EditLogInputStream elis = getJournalInputStream(jm, 5, true);
     FSEditLogOp op = elis.readOp();
     assertEquals("read unexpected op", op.getTransactionId(), 5);
   }
@@ -392,9 +451,9 @@ public class TestFileJournalManager {
     FileJournalManager jm = new FileJournalManager(sd, storage);
     
     // If we exclude the in-progess stream, we should only have 100 tx.
-    assertEquals(100, jm.getNumberOfTransactions(1, false));
+    assertEquals(100, getNumberOfTransactions(jm, 1, false, false));
     
-    EditLogInputStream elis = jm.getInputStream(90, false);
+    EditLogInputStream elis = getJournalInputStream(jm, 90, false);
     FSEditLogOp lastReadOp = null;
     while ((lastReadOp = elis.readOp()) != null) {
       assertTrue(lastReadOp.getTransactionId() <= 100);

+ 3 - 12
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.mockito.Mockito.mock;
@@ -26,9 +24,9 @@ import static org.junit.Assert.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Writable;
 
 import java.net.URI;
+import java.util.Collection;
 import java.io.IOException;
 
 public class TestGenericJournalConf {
@@ -144,15 +142,8 @@ public class TestGenericJournalConf {
     }
 
     @Override
-    public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
-        throws IOException {
-      return null;
-    }
-
-    @Override
-    public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
-        throws IOException {
-      return 0;
+    public void selectInputStreams(Collection<EditLogInputStream> streams,
+        long fromTxnId, boolean inProgressOk) {
     }
 
     @Override

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java

@@ -333,7 +333,7 @@ public class TestNameNodeRecovery {
   static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
       throws IOException {
     final String TEST_PATH = "/test/path/dir";
-    final int NUM_TEST_MKDIRS = 10;
+    final String TEST_PATH2 = "/second/dir";
     final boolean needRecovery = corruptor.needRecovery(finalize);
 
     // start a cluster
@@ -357,9 +357,8 @@ public class TestNameNodeRecovery {
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();
       FSImage fsimage = namesystem.getFSImage();
-      for (int i = 0; i < NUM_TEST_MKDIRS; i++) {
-        fileSys.mkdirs(new Path(TEST_PATH));
-      }
+      fileSys.mkdirs(new Path(TEST_PATH));
+      fileSys.mkdirs(new Path(TEST_PATH2));
       sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
     } finally {
       if (cluster != null) {
@@ -371,6 +370,7 @@ public class TestNameNodeRecovery {
     assertTrue("Should exist: " + editFile, editFile.exists());
 
     // Corrupt the edit log
+    LOG.info("corrupting edit log file '" + editFile + "'");
     corruptor.corrupt(editFile);
 
     // If needRecovery == true, make sure that we can't start the
@@ -423,6 +423,7 @@ public class TestNameNodeRecovery {
           .format(false).build();
       LOG.debug("successfully recovered the " + corruptor.getName() +
           " corrupted edit log");
+      cluster.waitActive();
       assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
     } catch (IOException e) {
       fail("failed to recover.  Error message: " + e.getMessage());

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java

@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.After;
@@ -278,7 +280,7 @@ public class TestFailureToReadEdits {
         .getEditLog());
     LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); 
     doAnswer(answer).when(spyEditLog).selectInputStreams(
-        anyLong(), anyLong(), anyBoolean());
+        anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
     nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
     
     return answer;