Browse Source

HDFS-1975. Support for sharing the namenode state from active to standby. Contributed by Jitendra Nath Pandey, Aaron T Myers, and Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1208813 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 13 years ago
parent
commit
f87a4b40bc
28 changed files with 904 additions and 66 deletions
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
  2. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 13 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
  7. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
  8. 10 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  9. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
  10. 75 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  11. 31 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  12. 128 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  14. 18 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
  15. 7 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  16. 43 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  17. 201 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
  18. 142 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
  19. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  20. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  21. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  22. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
  24. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
  25. 30 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  26. 15 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  27. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
  28. 128 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt

@@ -27,3 +27,5 @@ HDFS-2577. NN fails to start since it tries to start secret manager in safemode.
 HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd)
 
 HDFS-2591. MiniDFSCluster support to mix and match federation with HA (todd)
+
+HDFS-1975. Support for sharing the namenode state from active to standby. (jitendra, atm, todd)

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -162,6 +162,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT;
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
+  public static final String  DFS_NAMENODE_SHARED_EDITS_DIR_KEY = "dfs.namenode.shared.edits.dir";
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
   public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";

+ 13 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java

@@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
  * - followed by the invalid replica represented with three -1s;
  * - followed by the under-construction replica list where each replica is
  *   represented by 4 longs: three for the block id, length, generation 
- *   stamp, and the forth for the replica state.
+ *   stamp, and the fourth for the replica state.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -304,4 +304,16 @@ public class BlockListAsLongs implements Iterable<Block> {
     blockList[idx+1] = -1;
     blockList[idx+2] = -1;
   }
+
+  public long getMaxGsInBlockList() {
+    long maxGs = -1;
+    Iterator<Block> iter = getBlockReportIterator();
+    while (iter.hasNext()) {
+      Block b = iter.next();
+      if (b.getGenerationStamp() > maxGs) {
+        maxGs = b.getGenerationStamp();
+      }
+    }
+    return maxGs;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -568,7 +568,7 @@ public abstract class Storage extends StorageInfo {
      * <p> Locking is not supported by all file systems.
      * E.g., NFS does not consistently support exclusive locks.
      * 
-     * <p> If locking is supported we guarantee exculsive access to the
+     * <p> If locking is supported we guarantee exclusive access to the
      * storage directory. Otherwise, no guarantee is given.
      * 
      * @throws IOException if locking fails

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -345,7 +345,7 @@ public class BackupImage extends FSImage {
   synchronized void namenodeStartedLogSegment(long txid)
       throws IOException {
     LOG.info("NameNode started a new log segment at txid " + txid);
-    if (editLog.isOpen()) {
+    if (editLog.isOpenForWrite()) {
       if (editLog.getLastWrittenTxId() == txid - 1) {
         // We are in sync with the NN, so end and finalize the current segment
         editLog.endCurrentLogSegment(false);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -286,7 +286,7 @@ class Checkpointer extends Daemon {
           log.getStartTxId(), log.getEndTxId());
       if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
         editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), 
-                                                    log.getEndTxId()));
+                                                    log.getEndTxId(), true));
        }
     }
     LOG.info("Checkpointer about to load edits from " +

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java

@@ -133,4 +133,9 @@ class EditLogBackupInputStream extends EditLogInputStream {
   public long getLastTxId() throws IOException {
     return HdfsConstants.INVALID_TXID;
   }
+
+  @Override
+  boolean isInProgress() {
+    return true;
+  }
 }

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -41,6 +41,7 @@ class EditLogFileInputStream extends EditLogInputStream {
   private final int logVersion;
   private final FSEditLogOp.Reader reader;
   private final FSEditLogLoader.PositionTrackingInputStream tracker;
+  private final boolean isInProgress;
   
   /**
    * Open an EditLogInputStream for the given file.
@@ -53,7 +54,7 @@ class EditLogFileInputStream extends EditLogInputStream {
    */
   EditLogFileInputStream(File name)
       throws LogHeaderCorruptException, IOException {
-    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, false);
   }
 
   /**
@@ -66,8 +67,8 @@ class EditLogFileInputStream extends EditLogInputStream {
    * @throws IOException if an actual IO error occurs while reading the
    *         header
    */
-  EditLogFileInputStream(File name, long firstTxId, long lastTxId)
-      throws LogHeaderCorruptException, IOException {
+  EditLogFileInputStream(File name, long firstTxId, long lastTxId,
+      boolean isInProgress) throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
 
@@ -84,6 +85,7 @@ class EditLogFileInputStream extends EditLogInputStream {
     reader = new FSEditLogOp.Reader(in, logVersion);
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
+    this.isInProgress = isInProgress;
   }
 
   @Override
@@ -132,6 +134,11 @@ class EditLogFileInputStream extends EditLogInputStream {
     return file.length();
   }
   
+  @Override
+  boolean isInProgress() {
+    return isInProgress;
+  }
+  
   @Override
   public String toString() {
     return getName();

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java

@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * A generic abstract class to support reading edits log data from 
  * persistent storage.
@@ -27,7 +30,9 @@ import java.io.IOException;
  * It should stream bytes from the storage exactly as they were written
  * into the #{@link EditLogOutputStream}.
  */
-abstract class EditLogInputStream implements JournalStream, Closeable {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class EditLogInputStream implements JournalStream, Closeable {
   /** 
    * @return the first transaction which will be found in this stream
    */
@@ -75,4 +80,9 @@ abstract class EditLogInputStream implements JournalStream, Closeable {
    * Return the size of the current edits log.
    */
   abstract long length() throws IOException;
+  
+  /**
+   * Return true if this stream is in progress, false if it is finalized.
+   */
+  abstract boolean isInProgress();
 }

+ 75 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -60,22 +60,36 @@ public class FSEditLog  {
 
   /**
    * State machine for edit log.
+   * 
+   * In a non-HA setup:
+   * 
    * The log starts in UNITIALIZED state upon construction. Once it's
-   * initialized, it is usually in IN_SEGMENT state, indicating that edits
-   * may be written. In the middle of a roll, or while saving the namespace,
-   * it briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the
-   * previous segment has been closed, but the new one has not yet been opened.
+   * initialized, it is usually in IN_SEGMENT state, indicating that edits may
+   * be written. In the middle of a roll, or while saving the namespace, it
+   * briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the previous
+   * segment has been closed, but the new one has not yet been opened.
+   * 
+   * In an HA setup:
+   * 
+   * The log starts in UNINITIALIZED state upon construction. Once it's
+   * initialized, it sits in the OPEN_FOR_READING state the entire time that the
+   * NN is in standby. Upon the NN transition to active, the log will be CLOSED,
+   * and then move to being BETWEEN_LOG_SEGMENTS, much as if the NN had just
+   * started up, and then will move to IN_SEGMENT so it can begin writing to the
+   * log. The log states will then revert to behaving as they do in a non-HA
+   * setup.
    */
   private enum State {
     UNINITIALIZED,
     BETWEEN_LOG_SEGMENTS,
     IN_SEGMENT,
+    OPEN_FOR_READING,
     CLOSED;
   }  
   private State state = State.UNINITIALIZED;
   
   //initialize
-  final private JournalSet journalSet;
+  private JournalSet journalSet = null;
   private EditLogOutputStream editLogStream = null;
 
   // a monotonically increasing counter that represents transactionIds.
@@ -125,6 +139,11 @@ public class FSEditLog  {
   };
 
   final private Collection<URI> editsDirs;
+  
+  /**
+   * The edit directories that are shared between primary and secondary.
+   */
+  final private Collection<URI> sharedEditsDirs;
 
   /**
    * Construct FSEditLog with default configuration, taking editDirs from NNStorage
@@ -163,9 +182,34 @@ public class FSEditLog  {
     } else {
       this.editsDirs = Lists.newArrayList(editsDirs);
     }
-
+    
+    this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
+  }
+  
+  public void initJournalsForWrite() {
+    Preconditions.checkState(state == State.UNINITIALIZED ||
+        state == State.CLOSED, "Unexpected state: %s", state);
+    
+    initJournals(this.editsDirs);
+    state = State.BETWEEN_LOG_SEGMENTS;
+  }
+  
+  public void initSharedJournalsForRead() {
+    if (state == State.OPEN_FOR_READING) {
+      LOG.warn("Initializing shared journals for READ, already open for READ",
+          new Exception());
+      return;
+    }
+    Preconditions.checkState(state == State.UNINITIALIZED ||
+        state == State.CLOSED);
+    
+    initJournals(this.sharedEditsDirs);
+    state = State.OPEN_FOR_READING;
+  }
+  
+  private void initJournals(Collection<URI> dirs) {
     this.journalSet = new JournalSet();
-    for (URI u : this.editsDirs) {
+    for (URI u : dirs) {
       StorageDirectory sd = storage.getStorageDirectory(u);
       if (sd != null) {
         journalSet.add(new FileJournalManager(sd));
@@ -175,7 +219,6 @@ public class FSEditLog  {
     if (journalSet.isEmpty()) {
       LOG.error("No edits directories configured!");
     } 
-    state = State.BETWEEN_LOG_SEGMENTS;
   }
 
   /**
@@ -190,17 +233,22 @@ public class FSEditLog  {
    * Initialize the output stream for logging, opening the first
    * log segment.
    */
-  synchronized void open() throws IOException {
-    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS);
+  synchronized void openForWrite() throws IOException {
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+        "Bad state: %s", state);
 
     startLogSegment(getLastWrittenTxId() + 1, true);
     assert state == State.IN_SEGMENT : "Bad state: " + state;
   }
   
-  synchronized boolean isOpen() {
+  synchronized boolean isOpenForWrite() {
     return state == State.IN_SEGMENT;
   }
 
+  synchronized boolean isOpenForRead() {
+    return state == State.OPEN_FOR_READING;
+  }
+
   /**
    * Shutdown the file store.
    */
@@ -230,7 +278,8 @@ public class FSEditLog  {
    */
   void logEdit(final FSEditLogOp op) {
     synchronized (this) {
-      assert state != State.CLOSED;
+      assert state != State.CLOSED && state != State.OPEN_FOR_READING :
+        "bad state: " + state;
       
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
@@ -317,7 +366,7 @@ public class FSEditLog  {
   /**
    * Return the transaction ID of the last transaction written to the log.
    */
-  synchronized long getLastWrittenTxId() {
+  public synchronized long getLastWrittenTxId() {
     return txid;
   }
   
@@ -962,19 +1011,29 @@ public class FSEditLog  {
       // All journals have failed, it is handled in logSync.
     }
   }
+  
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId) throws IOException {
+    return selectInputStreams(fromTxId, toAtLeastTxId, true);
+  }
 
   /**
    * Select a list of input streams to load.
+   * 
    * @param fromTxId first transaction in the selected streams
    * @param toAtLeast the selected streams must contain this transaction
+   * @param inProgessOk set to true if in-progress streams are OK
    */
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId) throws IOException {
+  public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId, boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
     EditLogInputStream stream = journalSet.getInputStream(fromTxId);
     while (stream != null) {
+      if (inProgressOk || !stream.isInProgress()) {
+        streams.add(stream);
+      }
+      // We're now looking for a higher range, so reset the fromTxId
       fromTxId = stream.getLastTxId() + 1;
-      streams.add(stream);
       stream = journalSet.getInputStream(fromTxId);
     }
     if (fromTxId <= toAtLeastTxId) {

+ 31 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -68,7 +70,7 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class FSImage implements Closeable {
-  protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
+  public static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
@@ -112,7 +114,8 @@ public class FSImage implements Closeable {
    * @throws IOException if directories are invalid.
    */
   protected FSImage(Configuration conf,
-                    Collection<URI> imageDirs, Collection<URI> editsDirs)
+                    Collection<URI> imageDirs,
+                    Collection<URI> editsDirs)
       throws IOException {
     this.conf = conf;
 
@@ -123,6 +126,12 @@ public class FSImage implements Closeable {
     }
 
     this.editLog = new FSEditLog(conf, storage, editsDirs);
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
+      editLog.initJournalsForWrite();
+    } else {
+      editLog.initSharedJournalsForRead();
+    }
     
     archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
   }
@@ -217,6 +226,7 @@ public class FSImage implements Closeable {
       }
     }
 
+    // TODO(HA): Have to figure out a story for the first 3 of these.
     // 3. Do transitions
     switch(startOpt) {
     case UPGRADE:
@@ -251,6 +261,12 @@ public class FSImage implements Closeable {
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, storage);
+        // TODO(HA): Fix this.
+        String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+        if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
+          throw new IOException("Cannot start an HA namenode with name dirs " +
+              "that need recovery. Dir: " + sd + " state: " + curState);
+        }
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
@@ -326,7 +342,7 @@ public class FSImage implements Closeable {
         assert curDir.exists() : "Current directory must exist.";
         assert !prevDir.exists() : "prvious directory must not exist.";
         assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
-        assert !editLog.isOpen() : "Edits log must not be open.";
+        assert !editLog.isOpenForWrite() : "Edits log must not be open.";
 
         // rename current to tmp
         NNStorage.rename(curDir, tmpDir);
@@ -519,11 +535,11 @@ public class FSImage implements Closeable {
     return editLog;
   }
 
-  void openEditLog() throws IOException {
+  void openEditLogForWrite() throws IOException {
     assert editLog != null : "editLog must be initialized";
-    Preconditions.checkState(!editLog.isOpen(),
+    Preconditions.checkState(!editLog.isOpenForWrite(),
         "edit log should not yet be open");
-    editLog.open();
+    editLog.openForWrite();
     storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
   };
   
@@ -564,6 +580,7 @@ public class FSImage implements Closeable {
 
     Iterable<EditLogInputStream> editStreams = null;
 
+    // TODO(HA): We shouldn't run this when coming up in standby state
     editLog.recoverUnclosedStreams();
 
     if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
@@ -616,6 +633,8 @@ public class FSImage implements Closeable {
     
     // update the txid for the edit log
     editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
+    // TODO(HA): This should probably always return false when HA is enabled and
+    // we're coming up in standby state.
     return needToSave;
   }
 
@@ -644,7 +663,7 @@ public class FSImage implements Closeable {
    * Load the specified list of edit files into the image.
    * @return the number of transactions loaded
    */
-  protected long loadEdits(Iterable<EditLogInputStream> editStreams,
+  public long loadEdits(Iterable<EditLogInputStream> editStreams,
                            FSNamesystem target) throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
 
@@ -663,10 +682,13 @@ public class FSImage implements Closeable {
         lastAppliedTxId += thisNumLoaded;
       }
     } finally {
+      // TODO(HA): Should this happen when called by the tailer?
       FSEditLog.closeAllStreams(editStreams);
     }
 
     // update the counts
+    // TODO(HA): this may be very slow -- we probably want to
+    // update them as we go for HA.
     target.dir.updateCountForINodeWithQuota();    
     return numLoaded;
   }
@@ -688,8 +710,7 @@ public class FSImage implements Closeable {
   
   /**
    * Load in the filesystem image from file. It's a big list of
-   * filenames and blocks.  Return whether we should
-   * "re-save" and consolidate the edit-logs
+   * filenames and blocks.
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
       FSNamesystem target) throws IOException {
@@ -790,7 +811,7 @@ public class FSImage implements Closeable {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
-    boolean editLogWasOpen = editLog.isOpen();
+    boolean editLogWasOpen = editLog.isOpenForWrite();
     
     if (editLogWasOpen) {
       editLog.endCurrentLogSegment(true);

+ 128 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -46,6 +46,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DAT
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
@@ -108,6 +109,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -144,6 +146,11 @@ import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage;
+import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
+import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
+import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
+import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -170,6 +177,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /***************************************************
@@ -293,6 +301,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   // lock to protect FSNamesystem.
   private ReentrantReadWriteLock fsLock;
 
+  private PendingDataNodeMessages pendingDatanodeMessages = new PendingDataNodeMessages();
+  
+  /**
+   * Used when this NN is in standby state to read from the shared edit log.
+   */
+  private EditLogTailer editLogTailer = null;
+  
+  PendingDataNodeMessages getPendingDataNodeMessages() {
+    return pendingDatanodeMessages;
+  }
   
   /**
    * Instantiates an FSNamesystem loaded from the image and edits
@@ -303,7 +321,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @return an FSNamesystem which contains the loaded namespace
    * @throws IOException if loading fails
    */
-  public static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
+  public static FSNamesystem loadFromDisk(Configuration conf)
+    throws IOException {
     Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
     Collection<URI> namespaceEditsDirs = 
       FSNamesystem.getNamespaceEditsDirs(conf);
@@ -322,7 +341,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
     long loadStart = now();
     StartupOption startOpt = NameNode.getStartupOption(conf);
-    namesystem.loadFSImage(startOpt, fsImage);
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    namesystem.loadFSImage(startOpt, fsImage,
+      HAUtil.isHAEnabled(conf, nameserviceId));
     long timeTakenToLoadFSImage = now() - loadStart;
     LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
     NameNode.getNameNodeMetrics().setFsImageLoadTime(
@@ -368,7 +389,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     this.safeMode = new SafeModeInfo(conf);
   }
 
-  void loadFSImage(StartupOption startOpt, FSImage fsImage)
+  void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
       throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
@@ -379,10 +400,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
     boolean success = false;
     try {
-      if (fsImage.recoverTransitionRead(startOpt, this)) {
+      // We shouldn't be calling saveNamespace if we've come up in standby state.
+      if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) {
         fsImage.saveNamespace(this);
       }
-      fsImage.openEditLog();
+      // This will start a new log segment and write to the seen_txid file, so
+      // we shouldn't do it when coming up in standby state
+      if (!haEnabled) {
+        fsImage.openEditLogForWrite();
+      }
       
       success = true;
     } finally {
@@ -449,6 +475,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     LOG.info("Starting services required for active state");
     writeLock();
     try {
+      if (!dir.fsImage.editLog.isOpenForWrite()) {
+        // During startup, we're already open for write during initialization.
+        // TODO(HA): consider adding a startup state?
+        dir.fsImage.editLog.initJournalsForWrite();
+        // May need to recover
+        dir.fsImage.editLog.recoverUnclosedStreams();
+        dir.fsImage.editLog.openForWrite();
+      }
       if (UserGroupInformation.isSecurityEnabled()) {
         startSecretManager();
       }
@@ -459,7 +493,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   
   /** 
-   * Start services required in active state 
+   * Stop services required in active state
    * @throws InterruptedException
    */
   void stopActiveServices() {
@@ -470,6 +504,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (leaseManager != null) {
         leaseManager.stopMonitor();
       }
+      dir.fsImage.editLog.close();
     } finally {
       writeUnlock();
     }
@@ -478,11 +513,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   /** Start services required in standby state */
   void startStandbyServices() {
     LOG.info("Starting services required for standby state");
+    if (!dir.fsImage.editLog.isOpenForRead()) {
+      // During startup, we're already open for read.
+      dir.fsImage.editLog.initSharedJournalsForRead();
+    }
+    editLogTailer = new EditLogTailer(this);
+    editLogTailer.start();
   }
 
   /** Stop services required in standby state */
-  void stopStandbyServices() {
+  void stopStandbyServices() throws IOException {
     LOG.info("Stopping services started for standby state");
+    if (editLogTailer != null) {
+      editLogTailer.stop();
+    }
+    dir.fsImage.editLog.close();
   }
   
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
@@ -520,7 +565,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
-    return getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
+    Collection<URI> editsDirs = getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
+    editsDirs.addAll(getSharedEditsDirs(conf));
+    return editsDirs;
+  }
+  
+  /**
+   * Returns edit directories that are shared between primary and secondary.
+   * @param conf
+   * @return Collection of edit directories.
+   */
+  public static Collection<URI> getSharedEditsDirs(Configuration conf) {
+    // don't use getStorageDirs here, because we want an empty default
+    // rather than the dir in /tmp
+    Collection<String> dirNames = conf.getTrimmedStringCollection(
+        DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+    return Util.stringCollectionAsURIs(dirNames);
   }
 
   @Override
@@ -634,6 +694,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     } finally {
       // using finally to ensure we also wait for lease daemon
       try {
+        // TODO: these lines spew lots of warnings about "already stopped" logs, etc
         stopActiveServices();
         stopStandbyServices();
         if (dir != null) {
@@ -1796,12 +1857,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
   private Block allocateBlock(String src, INode[] inodes,
-      DatanodeDescriptor targets[]) throws QuotaExceededException {
+      DatanodeDescriptor targets[]) throws QuotaExceededException,
+      SafeModeException {
     assert hasWriteLock();
     Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(DFSUtil.getRandom().nextLong());
     }
+    // Increment the generation stamp for every new block.
+    nextGenerationStamp();
     b.setGenerationStamp(getGenerationStamp());
     b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
@@ -2703,11 +2767,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
   
-  FSImage getFSImage() {
+  public FSImage getFSImage() {
     return dir.fsImage;
   }
 
-  FSEditLog getEditLog() {
+  public FSEditLog getEditLog() {
     return getFSImage().getEditLog();
   }    
 
@@ -3726,6 +3790,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   
   private ObjectName mbeanName;
+
   /**
    * Register the FSNamesystem MBean using the name
    *        "hadoop:service=NameNode,name=FSNamesystemState"
@@ -3766,6 +3831,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   void setGenerationStamp(long stamp) {
     generationStamp.setStamp(stamp);
+    notifyGenStampUpdate(stamp);
   }
 
   /**
@@ -4523,4 +4589,55 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       byte[] password) throws InvalidToken {
     getDelegationTokenSecretManager().verifyToken(identifier, password);
   }
+  
+  public boolean isGenStampInFuture(long genStamp) {
+    return (genStamp > getGenerationStamp());
+  }
+  
+  public void notifyGenStampUpdate(long gs) {
+    LOG.info("=> notified of genstamp update for: " + gs);
+    DataNodeMessage msg = pendingDatanodeMessages.take(gs);
+    while (msg != null) {
+      LOG.info("processing message: " + msg);
+      try {
+        switch (msg.getType()) {
+        case BLOCK_RECEIVED_DELETE:
+          BlockReceivedDeleteMessage m = (BlockReceivedDeleteMessage) msg;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog
+                .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+                    + m.getNodeReg().getName() + " "
+                    + m.getReceivedAndDeletedBlocks().length + " blocks.");
+          }
+          this.getBlockManager().blockReceivedAndDeleted(m.getNodeReg(),
+              m.getPoolId(), m.getReceivedAndDeletedBlocks());
+          break;
+        case BLOCK_REPORT:
+          BlockReportMessage mbr = (BlockReportMessage) msg;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+                + "from " + mbr.getNodeReg().getName() + " "
+                + mbr.getBlockList().getNumberOfBlocks() + " blocks");
+          }
+          this.getBlockManager().processReport(mbr.getNodeReg(),
+              mbr.getPoolId(), mbr.getBlockList());
+          break;
+        case COMMIT_BLOCK_SYNCHRONIZATION:
+          CommitBlockSynchronizationMessage mcbm = (CommitBlockSynchronizationMessage) msg;
+          this.commitBlockSynchronization(mcbm.getBlock(),
+              mcbm.getNewgenerationstamp(), mcbm.getNewlength(),
+              mcbm.isCloseFile(), mcbm.isDeleteblock(), mcbm.getNewtargets());
+          break;
+        }
+      } catch (IOException ex) {
+        LOG.warn("Could not process the message " + msg.getType(), ex);
+      }
+      msg = pendingDatanodeMessages.take(gs);
+    }
+  }
+  
+  @VisibleForTesting
+  public EditLogTailer getEditLogTailer() {
+    return editLogTailer;
+  }
 }

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -90,7 +90,7 @@ class FileJournalManager implements JournalManager {
 
     File dstFile = NNStorage.getFinalizedEditsFile(
         sd, firstTxId, lastTxId);
-    LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
+    LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
     
     Preconditions.checkState(!dstFile.exists(),
         "Can't finalize edits file " + inprogressFile + " since finalized file " +
@@ -116,6 +116,7 @@ class FileJournalManager implements JournalManager {
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
+    LOG.info("Purging logs older than " + minTxIdToKeep);
     File[] files = FileUtil.listFiles(sd.getCurrentDir());
     List<EditLogFile> editLogs = 
       FileJournalManager.matchEditLogs(files);
@@ -169,7 +170,7 @@ class FileJournalManager implements JournalManager {
           LOG.error("Edits file " + f + " has improperly formatted " +
                     "transaction ID");
           // skip
-        }          
+        }
       }
       
       // Check for in-progress edits
@@ -190,7 +191,7 @@ class FileJournalManager implements JournalManager {
   }
 
   @Override
-  synchronized public EditLogInputStream getInputStream(long fromTxId) 
+  synchronized public EditLogInputStream getInputStream(long fromTxId)
       throws IOException {
     for (EditLogFile elf : getLogFiles(fromTxId)) {
       if (elf.getFirstTxId() == fromTxId) {
@@ -201,7 +202,7 @@ class FileJournalManager implements JournalManager {
           LOG.trace("Returning edit stream reading from " + elf);
         }
         return new EditLogFileInputStream(elf.getFile(), 
-            elf.getFirstTxId(), elf.getLastTxId());
+            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
       }
     }
 
@@ -245,6 +246,7 @@ class FileJournalManager implements JournalManager {
     }
 
     long max = findMaxTransaction();
+    
     // fromTxId should be greater than max, as it points to the next 
     // transaction we should expect to find. If it is less than or equal
     // to max, it means that a transaction with txid == max has not been found

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -30,6 +30,7 @@ import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -157,7 +158,8 @@ public class NNStorage extends Storage implements Closeable {
     
     // this may modify the editsDirs, so copy before passing in
     setStorageDirectories(imageDirs, 
-                          Lists.newArrayList(editsDirs));
+                          Lists.newArrayList(editsDirs),
+                          FSNamesystem.getSharedEditsDirs(conf));
   }
 
   @Override // Storage
@@ -245,6 +247,16 @@ public class NNStorage extends Storage implements Closeable {
   List<StorageDirectory> getRemovedStorageDirs() {
     return this.removedStorageDirs;
   }
+  
+  /**
+   * See {@link NNStorage#setStorageDirectories(Collection, Collection, Collection)}
+   */
+  @VisibleForTesting
+  synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
+                                          Collection<URI> fsEditsDirs)
+      throws IOException {
+    setStorageDirectories(fsNameDirs, fsEditsDirs, new ArrayList<URI>());
+  }
 
   /**
    * Set the storage directories which will be used. This should only ever be
@@ -261,7 +273,8 @@ public class NNStorage extends Storage implements Closeable {
    */
   @VisibleForTesting
   synchronized void setStorageDirectories(Collection<URI> fsNameDirs,
-                                          Collection<URI> fsEditsDirs)
+                                          Collection<URI> fsEditsDirs,
+                                          Collection<URI> sharedEditsDirs)
       throws IOException {
     this.storageDirs.clear();
     this.removedStorageDirs.clear();
@@ -285,7 +298,8 @@ public class NNStorage extends Storage implements Closeable {
       if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
           == 0){
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
-            dirType));
+            dirType,
+            !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
       }
     }
 
@@ -297,7 +311,7 @@ public class NNStorage extends Storage implements Closeable {
       if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
           == 0)
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
-                    NameNodeDirType.EDITS));
+                    NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
     }
   }
 

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -947,12 +947,17 @@ public class NameNode {
 
     @Override
     public void startStandbyServices() throws IOException {
-      // TODO:HA Start reading editlog from active
+      namesystem.startStandbyServices();
     }
 
     @Override
     public void stopStandbyServices() throws IOException {
-      // TODO:HA Stop reading editlog from active
+      // TODO(HA): Are we guaranteed to be the only active here?
+      namesystem.stopStandbyServices();
     }
   }
+  
+  public boolean isStandbyState() {
+    return (state.equals(STANDBY_STATE));
+  }
 }

+ 43 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -179,6 +179,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
         RefreshAuthorizationPolicyProtocol.class, this);
     this.clientRpcServer.addProtocol(RefreshUserMappingsProtocol.class, this);
     this.clientRpcServer.addProtocol(GetUserMappingsProtocol.class, this);
+    this.clientRpcServer.addProtocol(HAServiceProtocol.class, this);
     
 
     // set service-level authorization security policy
@@ -538,6 +539,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
       throws IOException {
     nn.checkOperation(OperationCategory.WRITE);
+    if (nn.isStandbyState()) {
+      if (namesystem.isGenStampInFuture(newgenerationstamp)) {
+        LOG.info("Required GS=" + newgenerationstamp
+            + ", Queuing commitBlockSynchronization message");
+        namesystem.getPendingDataNodeMessages().queueMessage(
+            new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
+                block, newgenerationstamp, newlength, closeFile, deleteblock,
+                newtargets, newgenerationstamp));
+        return;
+      }
+    }
     namesystem.commitBlockSynchronization(block,
         newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
   }
@@ -670,7 +682,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
       throws IOException {
-    nn.checkOperation(OperationCategory.READ);
+    // TODO(HA): decide on OperationCategory for this
     DatanodeInfo results[] = namesystem.datanodeReport(type);
     if (results == null ) {
       throw new IOException("Cannot find datanode report");
@@ -859,6 +871,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
       String poolId, long[] blocks) throws IOException {
     verifyRequest(nodeReg);
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
+    if (nn.isStandbyState()) {
+      long maxGs = blist.getMaxGsInBlockList();
+      if (namesystem.isGenStampInFuture(maxGs)) {
+        LOG.info("Required GS="+maxGs+", Queuing blockReport message");
+        namesystem.getPendingDataNodeMessages().queueMessage(
+            new PendingDataNodeMessages.BlockReportMessage(nodeReg, poolId,
+                blist, maxGs));
+        return null;
+      }
+    }
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
            + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@@ -866,7 +888,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
 
     namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
-    if (nn.getFSImage().isUpgradeFinalized())
+    if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
       return new FinalizeCommand(poolId);
     return null;
   }
@@ -875,6 +897,25 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
       ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
     verifyRequest(nodeReg);
+    if (nn.isStandbyState()) {
+      if (receivedAndDeletedBlocks.length > 0) {
+        long maxGs = receivedAndDeletedBlocks[0].getBlock()
+            .getGenerationStamp();
+        for (ReceivedDeletedBlockInfo binfo : receivedAndDeletedBlocks) {
+          if (binfo.getBlock().getGenerationStamp() > maxGs) {
+            maxGs = binfo.getBlock().getGenerationStamp();
+          }
+        }
+        if (namesystem.isGenStampInFuture(maxGs)) {
+          LOG.info("Required GS=" + maxGs
+              + ", Queuing blockReceivedAndDeleted message");
+          namesystem.getPendingDataNodeMessages().queueMessage(
+              new PendingDataNodeMessages.BlockReceivedDeleteMessage(nodeReg,
+                  poolId, receivedAndDeletedBlocks, maxGs));
+          return;
+        }
+      }
+    }
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
           +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length

+ 201 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java

@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+
+public class PendingDataNodeMessages {
+  
+  PriorityQueue<DataNodeMessage> queue = new PriorityQueue<DataNodeMessage>();
+  
+  enum MessageType {
+    BLOCK_RECEIVED_DELETE,
+    BLOCK_REPORT,
+    COMMIT_BLOCK_SYNCHRONIZATION
+  }
+  
+  static abstract class DataNodeMessage 
+     implements Comparable<DataNodeMessage> {
+    
+    final MessageType type;
+    private final long targetGs;
+    
+    DataNodeMessage(MessageType type, long targetGenStamp) {
+      this.type = type;
+      this.targetGs = targetGenStamp;
+    }
+    
+    protected MessageType getType() {
+      return type;
+    }
+    
+    protected long getTargetGs() {
+      return targetGs;
+    }
+    
+    public int compareTo(DataNodeMessage other) {
+      if (targetGs == other.targetGs) {
+        return 0;
+      } else if (targetGs < other.targetGs) {
+        return -1;
+      }
+      return 1;
+    }
+  }
+  
+  static class BlockReceivedDeleteMessage extends DataNodeMessage {
+    final DatanodeRegistration nodeReg;
+    final String poolId;
+    final ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks;
+    
+    BlockReceivedDeleteMessage(DatanodeRegistration nodeReg, String poolId,
+      ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks, long targetGs) {
+      super(MessageType.BLOCK_RECEIVED_DELETE, targetGs);
+      this.nodeReg = nodeReg;
+      this.poolId = poolId;
+      this.receivedAndDeletedBlocks = receivedAndDeletedBlocks;
+    }
+    
+    DatanodeRegistration getNodeReg() {
+      return nodeReg;
+    }
+    
+    String getPoolId() {
+      return poolId;
+    }
+    
+    ReceivedDeletedBlockInfo[] getReceivedAndDeletedBlocks() {
+      return receivedAndDeletedBlocks;
+    }
+    
+    public String toString() {
+      return "BlockReceivedDeletedMessage with " +
+        receivedAndDeletedBlocks.length + " blocks";
+    }
+  }
+  
+  static class CommitBlockSynchronizationMessage extends DataNodeMessage {
+
+    private final ExtendedBlock block;
+    private final long newgenerationstamp;
+    private final long newlength;
+    private final boolean closeFile;
+    private final boolean deleteblock;
+    private final DatanodeID[] newtargets;
+
+    CommitBlockSynchronizationMessage(ExtendedBlock block,
+        long newgenerationstamp, long newlength, boolean closeFile,
+        boolean deleteblock, DatanodeID[] newtargets, long targetGenStamp) {
+      super(MessageType.COMMIT_BLOCK_SYNCHRONIZATION, targetGenStamp);
+      this.block = block;
+      this.newgenerationstamp = newgenerationstamp;
+      this.newlength = newlength;
+      this.closeFile = closeFile;
+      this.deleteblock = deleteblock;
+      this.newtargets = newtargets;
+    }
+
+    ExtendedBlock getBlock() {
+      return block;
+    }
+
+    long getNewgenerationstamp() {
+      return newgenerationstamp;
+    }
+
+    long getNewlength() {
+      return newlength;
+    }
+
+    boolean isCloseFile() {
+      return closeFile;
+    }
+
+    boolean isDeleteblock() {
+      return deleteblock;
+    }
+
+    DatanodeID[] getNewtargets() {
+      return newtargets;
+    }
+    
+    public String toString() {
+      return "CommitBlockSynchronizationMessage for " + block;
+    }
+  }
+  
+  static class BlockReportMessage extends DataNodeMessage {
+
+    private final DatanodeRegistration nodeReg;
+    private final String poolId;
+    private final BlockListAsLongs blockList;
+
+    BlockReportMessage(DatanodeRegistration nodeReg, String poolId,
+        BlockListAsLongs blist, long targetGenStamp) {
+      super(MessageType.BLOCK_REPORT, targetGenStamp);
+      this.nodeReg = nodeReg;
+      this.poolId = poolId;
+      this.blockList = blist;
+    }
+
+    DatanodeRegistration getNodeReg() {
+      return nodeReg;
+    }
+
+    String getPoolId() {
+      return poolId;
+    }
+
+    BlockListAsLongs getBlockList() {
+      return blockList;
+    }
+
+    public String toString() {
+      return "BlockReport from " + nodeReg + " with " + blockList.getNumberOfBlocks() + " blocks";
+    }
+  }
+
+  synchronized void queueMessage(DataNodeMessage msg) {
+    queue.add(msg);
+  }
+  
+  /**
+   * Returns a message if contains a message less or equal to the given gs,
+   * otherwise returns null.
+   * 
+   * @param gs
+   */
+  synchronized DataNodeMessage take(long gs) {
+    DataNodeMessage m = queue.peek();
+    if (m != null && m.getTargetGs() < gs) {
+      return queue.remove();
+    } else {
+      return null;
+    }
+  }
+  
+  synchronized boolean isEmpty() {
+    return queue.isEmpty();
+  }
+}

+ 142 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * EditLogTailer represents a thread which periodically reads from edits
+ * journals and applies the transactions contained within to a given
+ * FSNamesystem.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class EditLogTailer {
+  public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
+  
+  private final EditLogTailerThread tailerThread;
+  
+  public EditLogTailer(FSNamesystem namesystem) {
+    this.tailerThread = new EditLogTailerThread(namesystem);
+  }
+  
+  public void start() {
+    tailerThread.start();
+  }
+  
+  public void stop() throws IOException {
+    tailerThread.setShouldRun(false);
+    tailerThread.interrupt();
+    try {
+      tailerThread.join();
+    } catch (InterruptedException e) {
+      LOG.warn("Edit log tailer thread exited with an exception");
+      throw new IOException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public void setSleepTime(long sleepTime) {
+    tailerThread.setSleepTime(sleepTime);
+  }
+  
+  @VisibleForTesting
+  public void interrupt() {
+    tailerThread.interrupt();
+  }
+
+  /**
+   * The thread which does the actual work of tailing edits journals and
+   * applying the transactions to the FSNS.
+   */
+  private static class EditLogTailerThread extends Thread {
+
+    private FSNamesystem namesystem;
+    private FSImage image;
+    private FSEditLog editLog;
+    
+    private volatile boolean shouldRun = true;
+    private long sleepTime = 60 * 1000;
+    
+    private EditLogTailerThread(FSNamesystem namesystem) {
+      super("Edit log tailer");
+      this.namesystem = namesystem;
+      image = namesystem.getFSImage();
+      editLog = namesystem.getEditLog();
+    }
+    
+    private void setShouldRun(boolean shouldRun) {
+      this.shouldRun = shouldRun;
+    }
+    
+    private void setSleepTime(long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+    
+    @Override
+    public void run() {
+      while (shouldRun) {
+        try {
+          long lastTxnId = image.getLastAppliedTxId();
+          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("lastTxnId: " + lastTxnId);
+          }
+          try {
+            // At least one record should be available.
+            Collection<EditLogInputStream> streams = editLog
+                .selectInputStreams(lastTxnId + 1, lastTxnId + 1, false);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("edit streams to load from: " + streams.size());
+            }
+            
+            long editsLoaded = image.loadEdits(streams, namesystem);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("editsLoaded: " + editsLoaded);
+            }
+          } catch (IOException e) {
+            // Will try again
+            LOG.info("Got error, will try again.", e);
+          }
+        } catch (Throwable t) {
+          // TODO(HA): What should we do in this case? Shutdown the standby NN?
+          LOG.error("Edit log tailer received throwable", t);
+        }
+
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          LOG.warn("Edit log tailer interrupted", e);
+        }
+      }
+    }
+  }
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -123,7 +123,8 @@ public interface DatanodeProtocol extends VersionedProtocol {
    * @param registration
    * @param poolId - the block pool ID for the blocks
    * @param blocks - the block list as an array of longs.
-   *     Each block is represented as 2 longs.
+   *     Each finalized block is represented as 3 longs. Each under-
+   *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.
    *     
    * @return - the next command for DN to process.

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -226,6 +226,18 @@ creations/deletions), or "all".</description>
       directories, for redundancy. Default value is same as dfs.name.dir
   </description>
 </property>
+
+<property>
+  <name>dfs.namenode.shared.edits.dir</name>
+  <value></value>
+  <description>A directory on shared storage between the multiple namenodes
+  in an HA cluster. This directory will be written by the active and read
+  by the standby in order to keep the namespaces synchronized. This directory
+  does not need to be listed in dfs.namenode.edits.dir above. It should be
+  left empty in a non-HA cluster.
+  </description>
+</property>
+  
 <property>
   <name>dfs.web.ugi</name>
   <value>webuser,webgroup</value>

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -606,7 +606,7 @@ public class MiniDFSCluster {
         if (manageNameDfsDirs) {
           URI sharedEditsUri = fileAsURI(new File(base_dir, "shared-edits-" +
               nnCounter + "-through-" + (nnCounter+nnIds.size()-1)));
-          // TODO in HDFS-1971: conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+          conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
         }
       }
 
@@ -667,7 +667,10 @@ public class MiniDFSCluster {
     FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
     for (URI dstDir : dstDirs) {
       Preconditions.checkArgument(!dstDir.equals(srcDir));
-      Files.deleteRecursively(new File(dstDir));
+      File dstDirF = new File(dstDir);
+      if (dstDirF.exists()) {
+        Files.deleteRecursively(dstDirF);
+      }
       LOG.info("Copying namedir from primary node dir "
           + srcDir + " to " + dstDir);
       FileUtil.copy(

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java

@@ -51,6 +51,17 @@ public class MiniDFSNNTopology {
           .setIpcPort(nameNodePort)));
   }
   
+
+  /**
+   * Set up an HA topology with a single HA nameservice.
+   */
+  public static MiniDFSNNTopology simpleHATopology() {
+    return new MiniDFSNNTopology()
+      .addNameservice(new MiniDFSNNTopology.NSConf(null)
+        .addNN(new MiniDFSNNTopology.NNConf("nn1"))
+        .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+  }
+
   /**
    * Set up federated cluster with the given number of nameservices, each
    * of which has only a single NameNode.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java

@@ -193,7 +193,7 @@ public class CreateEditsLog {
 
     FileNameGenerator nameGenerator = new FileNameGenerator(BASE_PATH, 100);
     FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
-    editLog.open();
+    editLog.openForWrite();
     addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
              nameGenerator);
     editLog.logSync();

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

@@ -182,9 +182,11 @@ public abstract class FSImageTestUtil {
     Mockito.doReturn(sd).when(storage)
       .getStorageDirectory(Matchers.<URI>anyObject());
 
-    return new FSEditLog(new Configuration(), 
+    FSEditLog editLog = new FSEditLog(new Configuration(), 
                          storage,
                          ImmutableList.of(logDir.toURI()));
+    editLog.initJournalsForWrite();
+    return editLog;
   }
   
   /**

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -19,13 +19,17 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
 
 /**
  * This is a utility class to expose NameNode functionality for unit tests.
@@ -47,6 +51,32 @@ public class NameNodeAdapter {
         src, offset, length, false, true);
   }
   
+  public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
+      boolean resolveLink) throws AccessControlException, UnresolvedLinkException {
+    return namenode.getNamesystem().getFileInfo(src, resolveLink);
+  }
+  
+  public static boolean mkdirs(NameNode namenode, String src,
+      PermissionStatus permissions, boolean createParent)
+      throws UnresolvedLinkException, IOException {
+    return namenode.getNamesystem().mkdirs(src, permissions, createParent);
+  }
+  
+  public static void saveNamespace(NameNode namenode)
+      throws AccessControlException, IOException {
+    namenode.getNamesystem().saveNamespace();
+  }
+  
+  public static void enterSafeMode(NameNode namenode, boolean resourcesLow)
+      throws IOException {
+    namenode.getNamesystem().enterSafeMode(resourcesLow);
+  }
+  
+  public static void leaveSafeMode(NameNode namenode, boolean checkForUpgrades)
+      throws SafeModeException {
+    namenode.getNamesystem().leaveSafeMode(checkForUpgrades);
+  }
+  
   /**
    * Get the internal RPC server instance.
    * @return rpc server

+ 15 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -580,7 +580,6 @@ public class TestEditLog extends TestCase {
             currentDir.getAbsolutePath());
         assertNotNull("No image found in " + nameDir, imageFile);
         assertEquals(NNStorage.getImageFileName(0), imageFile.getName());
-        
         // Try to start a new cluster
         LOG.info("\n===========================================\n" +
         "Starting same cluster after simulated crash");
@@ -772,6 +771,11 @@ public class TestEditLog extends TestCase {
     public JournalType getType() {
       return JournalType.FILE;
     }
+
+    @Override
+    boolean isInProgress() {
+      return true;
+    }
   }
 
   public void testFailedOpen() throws Exception {
@@ -780,7 +784,7 @@ public class TestEditLog extends TestCase {
     FSEditLog log = FSImageTestUtil.createStandaloneEditLog(logDir);
     try {
       logDir.setWritable(false);
-      log.open();
+      log.openForWrite();
       fail("Did no throw exception on only having a bad dir");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
@@ -805,6 +809,7 @@ public class TestEditLog extends TestCase {
         "[1,100]|[101,200]|[201,]",
         "[1,100]|[101,200]|[201,]");
     log = new FSEditLog(storage);
+    log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
     assertEquals("[[101,200]]",
@@ -816,6 +821,7 @@ public class TestEditLog extends TestCase {
         "[1,100]|[101,200]",
         "[1,100]|[201,300]|[301,400]"); // nothing starting at 101
     log = new FSEditLog(storage);
+    log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200], [201,300], [301,400]]",
         log.getEditLogManifest(1).toString());
     
@@ -825,6 +831,7 @@ public class TestEditLog extends TestCase {
         "[1,100]|[301,400]", // gap from 101 to 300
         "[301,400]|[401,500]");
     log = new FSEditLog(storage);
+    log.initJournalsForWrite();
     assertEquals("[[301,400], [401,500]]",
         log.getEditLogManifest(1).toString());
     
@@ -834,6 +841,7 @@ public class TestEditLog extends TestCase {
         "[1,100]|[101,150]", // short log at 101
         "[1,50]|[101,200]"); // short log at 1
     log = new FSEditLog(storage);
+    log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
     assertEquals("[[101,200]]",
@@ -846,6 +854,7 @@ public class TestEditLog extends TestCase {
         "[1,100]|[101,]", 
         "[1,100]|[101,200]"); 
     log = new FSEditLog(storage);
+    log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
     assertEquals("[[101,200]]",
@@ -938,7 +947,8 @@ public class TestEditLog extends TestCase {
     // open the edit log and add two transactions
     // logGenerationStamp is used, simply because it doesn't 
     // require complex arguments.
-    editlog.open();
+    editlog.initJournalsForWrite();
+    editlog.openForWrite();
     for (int i = 2; i < TXNS_PER_ROLL; i++) {
       editlog.logGenerationStamp((long)0);
     }
@@ -998,6 +1008,7 @@ public class TestEditLog extends TestCase {
                                    new AbortSpec(10, 1));
     long totaltxnread = 0;
     FSEditLog editlog = new FSEditLog(storage);
+    editlog.initJournalsForWrite();
     long startTxId = 1;
     Iterable<EditLogInputStream> editStreams = editlog.selectInputStreams(startTxId, 
                                                                           TXNS_PER_ROLL*11);
@@ -1047,6 +1058,7 @@ public class TestEditLog extends TestCase {
     assertTrue(files[0].delete());
     
     FSEditLog editlog = new FSEditLog(storage);
+    editlog.initJournalsForWrite();
     long startTxId = 1;
     try {
       Iterable<EditLogInputStream> editStreams 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -165,7 +165,7 @@ public class TestFSEditLogLoader {
     SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
     try {
       fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
-      fsel.open();
+      fsel.openForWrite();
       assertTrue("should exist: " + logFile, logFile.exists());
       
       for (int i = 0; i < NUM_TXNS; i++) {

+ 128 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java

@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+public class TestEditLogTailer {
+  
+  private static final String DIR_PREFIX = "/dir";
+  private static final int DIRS_TO_MAKE = 20;
+  private static final long SLEEP_TIME = 1000;
+  private static final long NN_LAG_TIMEOUT = 10 * 1000;
+  
+  static {
+    ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)EditLogTailer.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  @Test
+  public void testTailer() throws IOException, InterruptedException,
+      ServiceFailedException {
+    Configuration conf = new HdfsConfiguration();
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(0)
+      .build();
+    cluster.waitActive();
+    
+    cluster.transitionToActive(0);
+    
+    NameNode nn1 = cluster.getNameNode(0);
+    NameNode nn2 = cluster.getNameNode(1);
+    nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
+    nn2.getNamesystem().getEditLogTailer().interrupt();
+    try {
+      for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
+        NameNodeAdapter.mkdirs(nn1, getDirPath(i),
+            new PermissionStatus("test","test", new FsPermission((short)00755)),
+            true);
+      }
+      
+      waitForStandbyToCatchUp(nn1, nn2);
+      
+      for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
+        assertTrue(NameNodeAdapter.getFileInfo(nn2,
+            getDirPath(i), false).isDir());
+      }
+      
+      for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
+        NameNodeAdapter.mkdirs(nn1, getDirPath(i),
+            new PermissionStatus("test","test", new FsPermission((short)00755)),
+            true);
+      }
+      
+      waitForStandbyToCatchUp(nn1, nn2);
+      
+      for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
+        assertTrue(NameNodeAdapter.getFileInfo(nn2,
+            getDirPath(i), false).isDir());
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  private static String getDirPath(int suffix) {
+    return DIR_PREFIX + suffix;
+  }
+  
+  private static void waitForStandbyToCatchUp(NameNode active,
+      NameNode standby) throws InterruptedException, IOException {
+    
+    long activeTxId = active.getNamesystem().getFSImage().getEditLog()
+      .getLastWrittenTxId();
+    
+    doSaveNamespace(active);
+    
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < NN_LAG_TIMEOUT) {
+      long nn2HighestTxId = standby.getNamesystem().getFSImage()
+        .getLastAppliedTxId();
+      if (nn2HighestTxId >= activeTxId) {
+        break;
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
+  }
+  
+  private static void doSaveNamespace(NameNode nn)
+      throws IOException {
+    NameNodeAdapter.enterSafeMode(nn, false);
+    NameNodeAdapter.saveNamespace(nn);
+    NameNodeAdapter.leaveSafeMode(nn, false);
+  }
+  
+}