Просмотр исходного кода

HDFS-2158. Add JournalSet to manage the set of journals.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177473 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 лет назад
Родитель
Сommit
1ae5b5e338

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -53,6 +53,8 @@ Trunk (unreleased changes)
 
     HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
 
+    HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
+
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
 

+ 0 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     this.nnRegistration = nnReg;
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
-    Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
         RPC.getProxy(JournalProtocol.class,
@@ -67,16 +66,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
   
-  @Override // JournalStream
-  public String getName() {
-    return bnRegistration.getAddress();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.BACKUP;
-  }
-
   @Override // EditLogOutputStream
   void write(FSEditLogOp op) throws IOException {
     doubleBuf.writeOp(op);
@@ -141,16 +130,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     }
   }
 
-  /**
-   * There is no persistent storage. Therefore length is 0.<p>
-   * Length is used to check when it is large enough to start a checkpoint.
-   * This criteria should not be used for backup streams.
-   */
-  @Override // EditLogOutputStream
-  long length() throws IOException {
-    return 0;
-  }
-
   /**
    * Get backup node registration.
    */

+ 5 - 24
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java

@@ -37,9 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
  * stores edits in a local file.
  */
 class EditLogFileOutputStream extends EditLogOutputStream {
-  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
-
-  private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
 
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
@@ -73,16 +71,6 @@ class EditLogFileOutputStream extends EditLogOutputStream {
     fc.position(fc.size());
   }
 
-  @Override // JournalStream
-  public String getName() {
-    return file.getPath();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.FILE;
-  }
-
   /** {@inheritDoc} */
   @Override
   void write(FSEditLogOp op) throws IOException {
@@ -176,7 +164,10 @@ class EditLogFileOutputStream extends EditLogOutputStream {
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    
+    if (doubleBuf.isFlushed()) {
+      LOG.info("Nothing to flush");
+      return;
+    }
     preallocate(); // preallocate file if necessary
     doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
@@ -190,16 +181,6 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   public boolean shouldForceSync() {
     return doubleBuf.shouldForceSync();
   }
-  
-  /**
-   * Return the size of the current edit log including buffered data.
-   */
-  @Override
-  long length() throws IOException {
-    // file size - header size + size of both buffers
-    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + 
-      doubleBuf.countBufferedBytes();
-  }
 
   // allocate a big chunk of data
   private void preallocate() throws IOException {

+ 3 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java

@@ -18,23 +18,20 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import java.util.zip.Checksum;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
  */
-abstract class EditLogOutputStream implements JournalStream {
+abstract class EditLogOutputStream {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync
 
-  EditLogOutputStream() throws IOException {
+  EditLogOutputStream() {
     numSync = totalTimeSync = 0;
   }
 
@@ -105,12 +102,6 @@ abstract class EditLogOutputStream implements JournalStream {
     totalTimeSync += (end - start);
   }
 
-  /**
-   * Return the size of the current edits log.
-   * Length is used to check when it is large enough to start a checkpoint.
-   */
-  abstract long length() throws IOException;
-
   /**
    * Implement the policy when to automatically sync the buffered edits log
    * The buffered edits can be flushed when the buffer becomes full or
@@ -132,12 +123,7 @@ abstract class EditLogOutputStream implements JournalStream {
   /**
    * Return number of calls to {@link #flushAndSync()}
    */
-  long getNumSync() {
+  protected long getNumSync() {
     return numSync;
   }
-
-  @Override // Object
-  public String toString() {
-    return getName();
-  }
 }

+ 130 - 436
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,25 +34,17 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -62,9 +54,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 @InterfaceStability.Evolving
 public class FSEditLog  {
 
-  static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
-      " File system changes are not persistent. No journal streams.";
-
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
   /**
@@ -82,10 +71,11 @@ public class FSEditLog  {
     CLOSED;
   }  
   private State state = State.UNINITIALIZED;
+  
+  //initialize
+  final private JournalSet journalSet;
+  private EditLogOutputStream editLogStream = null;
 
-
-  private List<JournalAndStream> journals = Lists.newArrayList();
-    
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
 
@@ -137,15 +127,15 @@ public class FSEditLog  {
     this.storage = storage;
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
-    
+
+    this.journalSet = new JournalSet();
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
-      journals.add(new JournalAndStream(new FileJournalManager(sd)));
+      journalSet.add(new FileJournalManager(sd));
     }
     
-    if (journals.isEmpty()) {
+    if (journalSet.isEmpty()) {
       LOG.error("No edits directories configured!");
-    }
-    
+    } 
     state = State.BETWEEN_LOG_SEGMENTS;
   }
   
@@ -172,9 +162,8 @@ public class FSEditLog  {
       LOG.warn("Closing log when already closed", new Exception());
       return;
     }
-    
     if (state == State.IN_SEGMENT) {
-      assert !journals.isEmpty();
+      assert editLogStream != null;
       waitForSyncToFinish();
       endCurrentLogSegment(true);
     }
@@ -193,20 +182,14 @@ public class FSEditLog  {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if (journals.isEmpty()) {
-        throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      }
-      
       long start = beginTransaction();
       op.setTransactionId(txid);
 
-      mapJournalsAndReportErrors(new JournalClosure() {
-        @Override 
-        public void apply(JournalAndStream jas) throws IOException {
-          if (!jas.isActive()) return;
-          jas.stream.write(op);
-        }
-      }, "logging edit");
+      try {
+        editLogStream.write(op);
+      } catch (IOException ex) {
+        // All journals failed, it is handled in logSync.
+      }
 
       endTransaction(start);
       
@@ -251,14 +234,7 @@ public class FSEditLog  {
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-
-      if (jas.getCurrentStream().shouldForceSync()) {
-        return true;
-      }
-    }
-    return false;
+    return editLogStream.shouldForceSync();
   }
   
   private long beginTransaction() {
@@ -322,7 +298,7 @@ public class FSEditLog  {
    * NOTE: this should be done while holding the FSNamesystem lock, or
    * else more operations can start writing while this is in progress.
    */
-  void logSyncAll() throws IOException {
+  void logSyncAll() {
     // Record the most recent transaction ID as our own id
     synchronized (this) {
       TransactionId id = myTransactionId.get();
@@ -366,74 +342,73 @@ public class FSEditLog  {
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
     
-    List<JournalAndStream> candidateJournals =
-      Lists.newArrayListWithCapacity(journals.size());
-    List<JournalAndStream> badJournals = Lists.newArrayList();
-    
     boolean sync = false;
     try {
+      EditLogOutputStream logStream = null;
       synchronized (this) {
         try {
-        printStatistics(false);
-  
-        // if somebody is already syncing, then wait
-        while (mytxid > synctxid && isSyncRunning) {
-          try {
-            wait(1000);
-          } catch (InterruptedException ie) { 
+          printStatistics(false);
+
+          // if somebody is already syncing, then wait
+          while (mytxid > synctxid && isSyncRunning) {
+            try {
+              wait(1000);
+            } catch (InterruptedException ie) {
+            }
           }
-        }
   
-        //
-        // If this transaction was already flushed, then nothing to do
-        //
-        if (mytxid <= synctxid) {
-          numTransactionsBatchedInSync++;
-          if (metrics != null) // Metrics is non-null only when used inside name node
-            metrics.incrTransactionsBatchedInSync();
-          return;
-        }
+          //
+          // If this transaction was already flushed, then nothing to do
+          //
+          if (mytxid <= synctxid) {
+            numTransactionsBatchedInSync++;
+            if (metrics != null) {
+              // Metrics is non-null only when used inside name node
+              metrics.incrTransactionsBatchedInSync();
+            }
+            return;
+          }
      
-        // now, this thread will do the sync
-        syncStart = txid;
-        isSyncRunning = true;
-        sync = true;
+          // now, this thread will do the sync
+          syncStart = txid;
+          isSyncRunning = true;
+          sync = true;
   
-        // swap buffers
-        assert !journals.isEmpty() : "no editlog streams";
-        
-        for (JournalAndStream jas : journals) {
-          if (!jas.isActive()) continue;
+          // swap buffers
           try {
-            jas.getCurrentStream().setReadyToFlush();
-            candidateJournals.add(jas);
-          } catch (IOException ie) {
-            LOG.error("Unable to get ready to flush.", ie);
-            badJournals.add(jas);
+            if (journalSet.isEmpty()) {
+              throw new IOException("No journals available to flush");
+            }
+            editLogStream.setReadyToFlush();
+          } catch (IOException e) {
+            LOG.fatal("Could not sync any journal to persistent storage. "
+                + "Unsynced transactions: " + (txid - synctxid),
+                new Exception());
+            runtime.exit(1);
           }
-        }
         } finally {
           // Prevent RuntimeException from blocking other log edit write 
           doneWithAutoSyncScheduling();
         }
+        //editLogStream may become null,
+        //so store a local variable for flush.
+        logStream = editLogStream;
       }
-  
+      
       // do the sync
       long start = now();
-      for (JournalAndStream jas : candidateJournals) {
-        if (!jas.isActive()) continue;
-        try {
-          jas.getCurrentStream().flush();
-        } catch (IOException ie) {
-          LOG.error("Unable to sync edit log.", ie);
-          //
-          // remember the streams that encountered an error.
-          //
-          badJournals.add(jas);
+      try {
+        if (logStream != null) {
+          logStream.flush();
+        }
+      } catch (IOException ex) {
+        synchronized (this) {
+          LOG.fatal("Could not sync any journal to persistent storage. "
+              + "Unsynced transactions: " + (txid - synctxid), new Exception());
+          runtime.exit(1);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnJournals(badJournals);
   
       if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
@@ -443,13 +418,6 @@ public class FSEditLog  {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         if (sync) {
-          if (badJournals.size() >= journals.size()) {
-            LOG.fatal("Could not sync any journal to persistent storage. " +
-                "Unsynced transactions: " + (txid - synctxid),
-                new Exception());
-            runtime.exit(1);
-          }
-
           synctxid = syncStart;
           isSyncRunning = false;
         }
@@ -466,9 +434,6 @@ public class FSEditLog  {
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (journals.isEmpty()) {
-      return;
-    }
     lastPrintTime = now;
     StringBuilder buf = new StringBuilder();
     buf.append("Number of transactions: ");
@@ -478,20 +443,9 @@ public class FSEditLog  {
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      buf.append(jas.getCurrentStream().getNumSync());
-      break;
-    }
-
+    buf.append(editLogStream.getNumSync());
     buf.append(" SyncTimes(ms): ");
-
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      EditLogOutputStream eStream = jas.getCurrentStream();
-      buf.append(eStream.getTotalSyncTime());
-      buf.append(" ");
-    }
+    buf.append(journalSet.getSyncTimes());
     LOG.info(buf);
   }
 
@@ -664,7 +618,6 @@ public class FSEditLog  {
    * log delegation token to edit log
    * @param id DelegationTokenIdentifier
    * @param expiryTime of the token
-   * @return
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
@@ -702,25 +655,12 @@ public class FSEditLog  {
     logEdit(op);
   }
   
-  /**
-   * @return the number of active (non-failed) journals
-   */
-  private int countActiveJournals() {
-    int count = 0;
-    for (JournalAndStream jas : journals) {
-      if (jas.isActive()) {
-        count++;
-      }
-    }
-    return count;
-  }
-  
   /**
    * Used only by unit tests.
    */
   @VisibleForTesting
   List<JournalAndStream> getJournals() {
-    return journals;
+    return journalSet.getAllJournalStreams();
   }
   
   /**
@@ -734,62 +674,9 @@ public class FSEditLog  {
   /**
    * Return a manifest of what finalized edit logs are available
    */
-  public synchronized RemoteEditLogManifest getEditLogManifest(
-      long fromTxId) throws IOException {
-    // Collect RemoteEditLogs available from each FileJournalManager
-    List<RemoteEditLog> allLogs = Lists.newArrayList();
-    for (JournalAndStream j : journals) {
-      if (j.getManager() instanceof FileJournalManager) {
-        FileJournalManager fjm = (FileJournalManager)j.getManager();
-        try {
-          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
-        } catch (Throwable t) {
-          LOG.warn("Cannot list edit logs in " + fjm, t);
-        }
-      }
-    }
-    
-    // Group logs by their starting txid
-    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
-      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
-    long curStartTxId = fromTxId;
-
-    List<RemoteEditLog> logs = Lists.newArrayList();
-    while (true) {
-      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
-      if (logGroup.isEmpty()) {
-        // we have a gap in logs - for example because we recovered some old
-        // storage directory with ancient logs. Clear out any logs we've
-        // accumulated so far, and then skip to the next segment of logs
-        // after the gap.
-        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
-        startTxIds = startTxIds.tailSet(curStartTxId);
-        if (startTxIds.isEmpty()) {
-          break;
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
-                "not returning previous logs in manifest.");
-          }
-          logs.clear();
-          curStartTxId = startTxIds.first();
-          continue;
-        }
-      }
-
-      // Find the one that extends the farthest forward
-      RemoteEditLog bestLog = Collections.max(logGroup);
-      logs.add(bestLog);
-      // And then start looking from after that point
-      curStartTxId = bestLog.getEndTxId() + 1;
-    }
-    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
-          + ret);      
-    }
-    return ret;
+  public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
+      throws IOException {
+    return journalSet.getEditLogManifest(fromTxId);
   }
  
   /**
@@ -832,14 +719,9 @@ public class FSEditLog  {
     // See HDFS-2174.
     storage.attemptRestoreRemovedStorage();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.startLogSegment(segmentTxId);
-      }
-    }, "starting log segment " + segmentTxId);
-
-    if (countActiveJournals() == 0) {
+    try {
+      editLogStream = journalSet.startLogSegment(segmentTxId);
+    } catch (IOException ex) {
       throw new IOException("Unable to start log segment " +
           segmentTxId + ": no journals successfully started.");
     }
@@ -873,14 +755,12 @@ public class FSEditLog  {
     
     final long lastTxId = getLastWrittenTxId();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.close(lastTxId);
-        }
-      }
-    }, "ending log segment");
+    try {
+      journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
+      editLogStream = null;
+    } catch (IOException e) {
+      //All journals have failed, it will be handled in logSync.
+    }
     
     state = State.BETWEEN_LOG_SEGMENTS;
   }
@@ -889,14 +769,15 @@ public class FSEditLog  {
    * Abort all current logs. Called from the backup node.
    */
   synchronized void abortCurrentLogSegment() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-      
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.abort();
+    try {
+      //Check for null, as abort can be called any time.
+      if (editLogStream != null) {
+        editLogStream.abort();
+        editLogStream = null;
       }
-    }, "aborting all streams");
-    state = State.BETWEEN_LOG_SEGMENTS;
+    } catch (IOException e) {
+      LOG.warn("All journals failed to abort", e);
+    }
   }
 
   /**
@@ -912,13 +793,12 @@ public class FSEditLog  {
         "cannot purge logs older than txid " + minTxIdToKeep +
         " when current segment starts at " + curSegmentTxId;
     }
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.manager.purgeLogsOlderThan(minTxIdToKeep);
-      }
-    }, "purging logs older than " + minTxIdToKeep);
+
+    try {
+      journalSet.purgeLogsOlderThan(minTxIdToKeep);
+    } catch (IOException ex) {
+      //All journals have failed, it will be handled in logSync.
+    }
   }
 
   
@@ -946,9 +826,7 @@ public class FSEditLog  {
 
   // sets the initial capacity of the flush buffer.
   public void setOutputBufferCapacity(int size) {
-    for (JournalAndStream jas : journals) {
-      jas.manager.setOutputBufferCapacity(size);
-    }
+      journalSet.setOutputBufferCapacity(size);
   }
 
   /**
@@ -969,7 +847,7 @@ public class FSEditLog  {
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     
-    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    JournalManager jas = findBackupJournal(bnReg);
     if (jas != null) {
       // already registered
       LOG.info("Backup node " + bnReg + " re-registers");
@@ -978,35 +856,29 @@ public class FSEditLog  {
     
     LOG.info("Registering new backup node: " + bnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
-    journals.add(new JournalAndStream(bjm));
+    journalSet.add(bjm);
   }
   
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    for (Iterator<JournalAndStream> iter = journals.iterator();
-         iter.hasNext();) {
-      JournalAndStream jas = iter.next();
-      if (jas.manager instanceof BackupJournalManager &&
-          ((BackupJournalManager)jas.manager).matchesRegistration(
-              registration)) {
-        jas.abort();        
-        LOG.info("Removing backup journal " + jas);
-        iter.remove();
-      }
+  synchronized void releaseBackupStream(NamenodeRegistration registration)
+      throws IOException {
+    BackupJournalManager bjm = this.findBackupJournal(registration);
+    if (bjm != null) {
+      LOG.info("Removing backup journal " + bjm);
+      journalSet.remove(bjm);
     }
   }
   
   /**
    * Find the JournalAndStream associated with this BackupNode.
+   * 
    * @return null if it cannot be found
    */
-  private synchronized JournalAndStream findBackupJournalAndStream(
+  private synchronized BackupJournalManager findBackupJournal(
       NamenodeRegistration bnReg) {
-    for (JournalAndStream jas : journals) {
-      if (jas.manager instanceof BackupJournalManager) {
-        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
-        if (bjm.matchesRegistration(bnReg)) {
-          return jas;
-        }
+    for (JournalManager bjm : journalSet.getJournalManagers()) {
+      if ((bjm instanceof BackupJournalManager)
+          && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
+        return (BackupJournalManager) bjm;
       }
     }
     return null;
@@ -1018,124 +890,24 @@ public class FSEditLog  {
    */   
   synchronized void logEdit(final int length, final byte[] data) {
     long start = beginTransaction();
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
-        }
-      }      
-    }, "Logging edit");
-
-    endTransaction(start);
-  }
 
-  //// Iteration across journals
-  private interface JournalClosure {
-    public void apply(JournalAndStream jas) throws IOException;
-  }
-
-  /**
-   * Apply the given function across all of the journal managers, disabling
-   * any for which the closure throws an IOException.
-   * @param status message used for logging errors (e.g. "opening journal")
-   */
-  private void mapJournalsAndReportErrors(
-      JournalClosure closure, String status) {
-    List<JournalAndStream> badJAS = Lists.newLinkedList();
-    for (JournalAndStream jas : journals) {
-      try {
-        closure.apply(jas);
-      } catch (Throwable t) {
-        LOG.error("Error " + status + " (journal " + jas + ")", t);
-        badJAS.add(jas);
-      }
-    }
-
-    disableAndReportErrorOnJournals(badJAS);
-  }
-  
-  /**
-   * Called when some journals experience an error in some operation.
-   * This propagates errors to the storage level.
-   */
-  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
-    if (badJournals == null || badJournals.isEmpty()) {
-      return; // nothing to do
-    }
- 
-    for (JournalAndStream j : badJournals) {
-      LOG.error("Disabling journal " + j);
-      j.abort();
-    }
-  }
-
-  /**
-   * Find the best editlog input stream to read from txid. In this case
-   * best means the editlog which has the largest continuous range of 
-   * transactions starting from the transaction id, fromTxId.
-   *
-   * If a journal throws an CorruptionException while reading from a txn id,
-   * it means that it has more transactions, but can't find any from fromTxId. 
-   * If this is the case and no other journal has transactions, we should throw
-   * an exception as it means more transactions exist, we just can't load them.
-   *
-   * @param fromTxId Transaction id to start from.
-   * @return a edit log input stream with tranactions fromTxId 
-   *         or null if no more exist
-   */
-  private EditLogInputStream selectStream(long fromTxId) 
-      throws IOException {
-    JournalManager bestjm = null;
-    long bestjmNumTxns = 0;
-    CorruptionException corruption = null;
-
-    for (JournalAndStream jas : journals) {
-      JournalManager candidate = jas.getManager();
-      long candidateNumTxns = 0;
-      try {
-        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
-      } catch (CorruptionException ce) {
-        corruption = ce;
-      } catch (IOException ioe) {
-        LOG.warn("Error reading number of transactions from " + candidate);
-        continue; // error reading disk, just skip
-      }
-      
-      if (candidateNumTxns > bestjmNumTxns) {
-        bestjm = candidate;
-        bestjmNumTxns = candidateNumTxns;
-      }
-    }
-    
-    
-    if (bestjm == null) {
-      /**
-       * If all candidates either threw a CorruptionException or
-       * found 0 transactions, then a gap exists. 
-       */
-      if (corruption != null) {
-        throw new IOException("Gap exists in logs from " 
-                              + fromTxId, corruption);
-      } else {
-        return null;
-      }
+    try {
+      editLogStream.writeRaw(data, 0, length);
+    } catch (IOException ex) {
+      // All journals have failed, it will be handled in logSync.
     }
-
-    return bestjm.getInputStream(fromTxId);
+    endTransaction(start);
   }
 
   /**
    * Run recovery on all journals to recover any unclosed segments
    */
   void recoverUnclosedStreams() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-        @Override
-        public void apply(JournalAndStream jas) throws IOException {
-          jas.manager.recoverUnfinalizedSegments();
-        }
-      }, "recovering unclosed streams");
+    try {
+      journalSet.recoverUnfinalizedSegments();
+    } catch (IOException ex) {
+      // All journals have failed, it is handled in logSync.
+    }
   }
 
   /**
@@ -1143,23 +915,16 @@ public class FSEditLog  {
    * @param fromTxId first transaction in the selected streams
    * @param toAtLeast the selected streams must contain this transaction
    */
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
-      throws IOException {
-    List<EditLogInputStream> streams = Lists.newArrayList();
-    
-    boolean gapFound = false;
-    EditLogInputStream stream = selectStream(fromTxId);
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId) throws IOException {
+    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+    EditLogInputStream stream = journalSet.getInputStream(fromTxId);
     while (stream != null) {
       fromTxId = stream.getLastTxId() + 1;
       streams.add(stream);
-      try {
-        stream = selectStream(fromTxId);
-      } catch (IOException ioe) {
-        gapFound = true;
-        break;
-      }
+      stream = journalSet.getInputStream(fromTxId);
     }
-    if (fromTxId <= toAtLeastTxId || gapFound) {
+    if (fromTxId <= toAtLeastTxId) {
       closeAllStreams(streams);
       throw new IOException("No non-corrupt logs for txid " 
                             + fromTxId);
@@ -1176,75 +941,4 @@ public class FSEditLog  {
       IOUtils.closeStream(s);
     }
   }
-
-  /**
-   * Container for a JournalManager paired with its currently
-   * active stream.
-   * 
-   * If a Journal gets disabled due to an error writing to its
-   * stream, then the stream will be aborted and set to null.
-   */
-  static class JournalAndStream {
-    private final JournalManager manager;
-    private EditLogOutputStream stream;
-    private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    
-    private JournalAndStream(JournalManager manager) {
-      this.manager = manager;
-    }
-
-    private void startLogSegment(long txId) throws IOException {
-      Preconditions.checkState(stream == null);
-      stream = manager.startLogSegment(txId);
-      segmentStartsAtTxId = txId;
-    }
-
-    private void close(long lastTxId) throws IOException {
-      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
-          "invalid segment: lastTxId %s >= " +
-          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
-          
-      if (stream == null) return;
-      stream.close();
-      manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
-      stream = null;
-    }
-    
-    @VisibleForTesting
-    void abort() {
-      if (stream == null) return;
-      try {
-        stream.abort();
-      } catch (IOException ioe) {
-        LOG.error("Unable to abort stream " + stream, ioe);
-      }
-      stream = null;
-      segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    }
-
-    private boolean isActive() {
-      return stream != null;
-    }
-
-    @VisibleForTesting
-    EditLogOutputStream getCurrentStream() {
-      return stream;
-    }
-    
-    @Override
-    public String toString() {
-      return "JournalAndStream(mgr=" + manager +
-        ", " + "stream=" + stream + ")";
-    }
-
-    @VisibleForTesting
-    void setCurrentStreamForTests(EditLogOutputStream stream) {
-      this.stream = stream;
-    }
-    
-    @VisibleForTesting
-    JournalManager getManager() {
-      return manager;
-    }
-  }
 }

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Comparator;
 import java.util.Collections;
 import java.util.regex.Matcher;

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 
 /**
  * A JournalManager is responsible for managing a single place of storing

+ 549 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java

@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Sets;
+
+/**
+ * Manages a collection of Journals. None of the methods are synchronized, it is
+ * assumed that FSEditLog methods, that use this class, use proper
+ * synchronization.
+ */
+public class JournalSet implements JournalManager {
+
+  static final Log LOG = LogFactory.getLog(FSEditLog.class);
+  
+  /**
+   * Container for a JournalManager paired with its currently
+   * active stream.
+   * 
+   * If a Journal gets disabled due to an error writing to its
+   * stream, then the stream will be aborted and set to null.
+   * 
+   * This should be used outside JournalSet only for testing.
+   */
+  @VisibleForTesting
+  static class JournalAndStream {
+    private final JournalManager journal;
+    private boolean disabled = false;
+    private EditLogOutputStream stream;
+    
+    public JournalAndStream(JournalManager manager) {
+      this.journal = manager;
+    }
+
+    public void startLogSegment(long txId) throws IOException {
+      Preconditions.checkState(stream == null);
+      disabled = false;
+      stream = journal.startLogSegment(txId);
+    }
+
+    /**
+     * Closes the stream, also sets it to null.
+     */
+    public void close() throws IOException {
+      if (stream == null) return;
+      stream.close();
+      stream = null;
+    }
+    
+    /**
+     * Aborts the stream, also sets it to null.
+     */
+    public void abort() {
+      if (stream == null) return;
+      try {
+        stream.abort();
+      } catch (IOException ioe) {
+        LOG.error("Unable to abort stream " + stream, ioe);
+      }
+      stream = null;
+    }
+
+    boolean isActive() {
+      return stream != null;
+    }
+    
+    /**
+     * Should be used outside JournalSet only for testing.
+     */
+    EditLogOutputStream getCurrentStream() {
+      return stream;
+    }
+    
+    @Override
+    public String toString() {
+      return "JournalAndStream(mgr=" + journal +
+        ", " + "stream=" + stream + ")";
+    }
+
+    void setCurrentStreamForTests(EditLogOutputStream stream) {
+      this.stream = stream;
+    }
+    
+    JournalManager getManager() {
+      return journal;
+    }
+
+    private boolean isDisabled() {
+      return disabled;
+    }
+
+    private void setDisabled(boolean disabled) {
+      this.disabled = disabled;
+    }
+  }
+  
+  private List<JournalAndStream> journals = Lists.newArrayList();
+  
+  @Override
+  public EditLogOutputStream startLogSegment(final long txId) throws IOException {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.startLogSegment(txId);
+      }
+    }, "starting log segment " + txId);
+    return new JournalSetOutputStream();
+  }
+  
+  @Override
+  public void finalizeLogSegment(final long firstTxId, final long lastTxId)
+      throws IOException {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        if (jas.isActive()) {
+          jas.close();
+          jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
+        }
+      }
+    }, "finalize log segment " + firstTxId + ", " + lastTxId);
+  }
+  
+  
+  /**
+   * Find the best editlog input stream to read from txid.
+   * If a journal throws an CorruptionException while reading from a txn id,
+   * it means that it has more transactions, but can't find any from fromTxId. 
+   * If this is the case and no other journal has transactions, we should throw
+   * an exception as it means more transactions exist, we just can't load them.
+   *
+   * @param fromTxnId Transaction id to start from.
+   * @return A edit log input stream with tranactions fromTxId 
+   *         or null if no more exist
+   */
+  @Override
+  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+    JournalManager bestjm = null;
+    long bestjmNumTxns = 0;
+    CorruptionException corruption = null;
+
+    for (JournalAndStream jas : journals) {
+      JournalManager candidate = jas.getManager();
+      long candidateNumTxns = 0;
+      try {
+        candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId);
+      } catch (CorruptionException ce) {
+        corruption = ce;
+      } catch (IOException ioe) {
+        continue; // error reading disk, just skip
+      }
+      
+      if (candidateNumTxns > bestjmNumTxns) {
+        bestjm = candidate;
+        bestjmNumTxns = candidateNumTxns;
+      }
+    }
+    
+    if (bestjm == null) {
+      if (corruption != null) {
+        throw new IOException("No non-corrupt logs for txid " 
+                                        + fromTxnId, corruption);
+      } else {
+        return null;
+      }
+    }
+    return bestjm.getInputStream(fromTxnId);
+  }
+  
+  @Override
+  public long getNumberOfTransactions(long fromTxnId) throws IOException {
+    long num = 0;
+    for (JournalAndStream jas: journals) {
+      if (jas.isActive()) {
+        long newNum = jas.getManager().getNumberOfTransactions(fromTxnId);
+        if (newNum > num) {
+          num = newNum;
+        }
+      }
+    }
+    return num;
+  }
+
+  /**
+   * Returns true if there are no journals or all are disabled.
+   * @return True if no journals or all are disabled.
+   */
+  public boolean isEmpty() {
+    for (JournalAndStream jas : journals) {
+      if (!jas.isDisabled()) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**
+   * Called when some journals experience an error in some operation.
+   */
+  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
+    if (badJournals == null || badJournals.isEmpty()) {
+      return; // nothing to do
+    }
+ 
+    for (JournalAndStream j : badJournals) {
+      LOG.error("Disabling journal " + j);
+      j.abort();
+      j.setDisabled(true);
+    }
+  }
+
+  /**
+   * Implementations of this interface encapsulate operations that can be
+   * iteratively applied on all the journals. For example see
+   * {@link JournalSet#mapJournalsAndReportErrors}.
+   */
+  private interface JournalClosure {
+    /**
+     * The operation on JournalAndStream.
+     * @param jas Object on which operations are performed.
+     * @throws IOException
+     */
+    public void apply(JournalAndStream jas) throws IOException;
+  }
+  
+  /**
+   * Apply the given operation across all of the journal managers, disabling
+   * any for which the closure throws an IOException.
+   * @param closure {@link JournalClosure} object encapsulating the operation.
+   * @param status message used for logging errors (e.g. "opening journal")
+   * @throws IOException If the operation fails on all the journals.
+   */
+  private void mapJournalsAndReportErrors(
+      JournalClosure closure, String status) throws IOException{
+    List<JournalAndStream> badJAS = Lists.newLinkedList();
+    for (JournalAndStream jas : journals) {
+      try {
+        closure.apply(jas);
+      } catch (Throwable t) {
+        LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
+        badJAS.add(jas);
+      }
+    }
+    disableAndReportErrorOnJournals(badJAS);
+    if (badJAS.size() >= journals.size()) {
+      LOG.error("Error: "+status+" failed for all journals");
+      throw new IOException(status+" failed on all the journals");
+    }
+  }
+  
+  /**
+   * An implementation of EditLogOutputStream that applies a requested method on
+   * all the journals that are currently active.
+   */
+  private class JournalSetOutputStream extends EditLogOutputStream {
+
+    JournalSetOutputStream() throws IOException {
+      super();
+    }
+
+    @Override
+    void write(final FSEditLogOp op)
+        throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().write(op);
+          }
+        }
+      }, "write op");
+    }
+
+    @Override
+    void writeRaw(final byte[] data, final int offset, final int length)
+        throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().writeRaw(data, offset, length);
+          }
+        }
+      }, "write bytes");
+    }
+
+    @Override
+    void create() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().create();
+          }
+        }
+      }, "create");
+    }
+
+    @Override
+    public void close() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.close();
+        }
+      }, "close");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.abort();
+        }
+      }, "abort");
+    }
+
+    @Override
+    void setReadyToFlush() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().setReadyToFlush();
+          }
+        }
+      }, "setReadyToFlush");
+    }
+
+    @Override
+    protected void flushAndSync() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().flushAndSync();
+          }
+        }
+      }, "flushAndSync");
+    }
+    
+    @Override
+    public void flush() throws IOException {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          if (jas.isActive()) {
+            jas.getCurrentStream().flush();
+          }
+        }
+      }, "flush");
+    }
+    
+    @Override
+    public boolean shouldForceSync() {
+      for (JournalAndStream js : journals) {
+        if (js.isActive() && js.getCurrentStream().shouldForceSync()) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    protected long getNumSync() {
+      for (JournalAndStream jas : journals) {
+        if (jas.isActive()) {
+          return jas.getCurrentStream().getNumSync();
+        }
+      }
+      return 0;
+    }
+  }
+
+  @Override
+  public void setOutputBufferCapacity(final int size) {
+    try {
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+            jas.getManager().setOutputBufferCapacity(size);
+        }
+      }, "setOutputBufferCapacity");
+    } catch (IOException e) {
+      LOG.error("Error in setting outputbuffer capacity");
+    }
+  }
+  
+  @VisibleForTesting
+  List<JournalAndStream> getAllJournalStreams() {
+    return journals;
+  }
+
+  List<JournalManager> getJournalManagers() {
+    List<JournalManager> jList = new ArrayList<JournalManager>();
+    for (JournalAndStream j : journals) {
+      jList.add(j.getManager());
+    }
+    return jList;
+  }
+
+  void add(JournalManager j) {
+    journals.add(new JournalAndStream(j));
+  }
+  
+  void remove(JournalManager j) {
+    JournalAndStream jasToRemove = null;
+    for (JournalAndStream jas: journals) {
+      if (jas.getManager().equals(j)) {
+        jasToRemove = jas;
+        break;
+      }
+    }
+    if (jasToRemove != null) {
+      jasToRemove.abort();
+      journals.remove(jasToRemove);
+    }
+  }
+
+  @Override
+  public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
+      }
+    }, "purgeLogsOlderThan " + minTxIdToKeep);
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.getManager().recoverUnfinalizedSegments();
+      }
+    }, "recoverUnfinalizedSegments");
+  }
+  
+  /**
+   * Return a manifest of what finalized edit logs are available. All available
+   * edit logs are returned starting from the transaction id passed.
+   * 
+   * @param fromTxId Starting transaction id to read the logs.
+   * @return RemoteEditLogManifest object.
+   */
+  public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
+    // Collect RemoteEditLogs available from each FileJournalManager
+    List<RemoteEditLog> allLogs = Lists.newArrayList();
+    for (JournalAndStream j : journals) {
+      if (j.getManager() instanceof FileJournalManager) {
+        FileJournalManager fjm = (FileJournalManager)j.getManager();
+        try {
+          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
+        } catch (Throwable t) {
+          LOG.warn("Cannot list edit logs in " + fjm, t);
+        }
+      }
+    }
+    
+    // Group logs by their starting txid
+    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
+      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
+    long curStartTxId = fromTxId;
+
+    List<RemoteEditLog> logs = Lists.newArrayList();
+    while (true) {
+      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
+      if (logGroup.isEmpty()) {
+        // we have a gap in logs - for example because we recovered some old
+        // storage directory with ancient logs. Clear out any logs we've
+        // accumulated so far, and then skip to the next segment of logs
+        // after the gap.
+        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
+        startTxIds = startTxIds.tailSet(curStartTxId);
+        if (startTxIds.isEmpty()) {
+          break;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
+                "not returning previous logs in manifest.");
+          }
+          logs.clear();
+          curStartTxId = startTxIds.first();
+          continue;
+        }
+      }
+
+      // Find the one that extends the farthest forward
+      RemoteEditLog bestLog = Collections.max(logGroup);
+      logs.add(bestLog);
+      // And then start looking from after that point
+      curStartTxId = bestLog.getEndTxId() + 1;
+    }
+    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+          + ret);      
+    }
+    return ret;
+  }
+
+  /**
+   * Add sync times to the buffer.
+   */
+  String getSyncTimes() {
+    StringBuilder buf = new StringBuilder();
+    for (JournalAndStream jas : journals) {
+      if (jas.isActive()) {
+        buf.append(jas.getCurrentStream().getTotalSyncTime());
+        buf.append(" ");
+      }
+    }
+    return buf.toString();
+  }
+}

+ 27 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,7 +75,7 @@ public class TestEditLogJournalFailures {
   public void testSingleFailedEditsDirOnFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate one edits journal.
-    invalidateEditsDirAtIndex(0, true);
+    invalidateEditsDirAtIndex(0, true, false);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -86,8 +88,22 @@ public class TestEditLogJournalFailures {
   public void testAllEditsDirsFailOnFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate both edits journals.
-    invalidateEditsDirAtIndex(0, true);
-    invalidateEditsDirAtIndex(1, true);
+    invalidateEditsDirAtIndex(0, true, false);
+    invalidateEditsDirAtIndex(1, true, false);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // The previous edit could not be synced to any persistent storage, should
+    // have halted the NN.
+    assertExitInvocations(1);
+  }
+  
+  @Test
+  public void testAllEditsDirFailOnWrite() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate both edits journals.
+    invalidateEditsDirAtIndex(0, true, true);
+    invalidateEditsDirAtIndex(1, true, true);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -100,7 +116,7 @@ public class TestEditLogJournalFailures {
   public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate one edits journal.
-    invalidateEditsDirAtIndex(0, false);
+    invalidateEditsDirAtIndex(0, false, false);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -117,16 +133,18 @@ public class TestEditLogJournalFailures {
    * @return the original <code>EditLogOutputStream</code> of the journal.
    */
   private EditLogOutputStream invalidateEditsDirAtIndex(int index,
-      boolean failOnFlush) throws IOException {
+      boolean failOnFlush, boolean failOnWrite) throws IOException {
     FSImage fsimage = cluster.getNamesystem().getFSImage();
     FSEditLog editLog = fsimage.getEditLog();
-    
 
-    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    JournalAndStream jas = editLog.getJournals().get(index);
     EditLogFileOutputStream elos =
       (EditLogFileOutputStream) jas.getCurrentStream();
     EditLogFileOutputStream spyElos = spy(elos);
-    
+    if (failOnWrite) {
+      doThrow(new IOException("fail on write()")).when(spyElos).write(
+          (FSEditLogOp) any());
+    }
     if (failOnFlush) {
       doThrow(new IOException("fail on flush()")).when(spyElos).flush();
     } else {
@@ -151,7 +169,7 @@ public class TestEditLogJournalFailures {
     FSImage fsimage = cluster.getNamesystem().getFSImage();
     FSEditLog editLog = fsimage.getEditLog();
 
-    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    JournalAndStream jas = editLog.getJournals().get(index);
     jas.setCurrentStreamForTests(elos);
   }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.log4j.Level;
 
@@ -356,7 +357,7 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      FSEditLog.JournalAndStream jas = editLog.getJournals().get(0);
+      JournalAndStream jas = editLog.getJournals().get(0);
       EditLogFileOutputStream spyElos =
           spy((EditLogFileOutputStream)jas.getCurrentStream());
       jas.setCurrentStreamForTests(spyElos);

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Set;
 
-import static org.mockito.Matchers.anyByte;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
@@ -45,7 +44,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
@@ -123,7 +122,7 @@ public class TestStorageRestore {
     // simulate an error
     fi.getStorage().reportErrorsOnDirectories(al);
     
-    for (FSEditLog.JournalAndStream j : fi.getEditLog().getJournals()) {
+    for (JournalAndStream j : fi.getEditLog().getJournals()) {
       if (j.getManager() instanceof FileJournalManager) {
         FileJournalManager fm = (FileJournalManager)j.getManager();
         if (fm.getStorageDirectory().getRoot().equals(path2)