Procházet zdrojové kódy

HDFS-1979. Fix backupnode for new edits/image layout. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1073@1146845 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon před 14 roky
rodič
revize
762663dc86
26 změnil soubory, kde provedl 1158 přidání a 869 odebrání
  1. 1 0
      hdfs/CHANGES.HDFS-1073.txt
  2. 249 269
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  3. 65 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
  4. 48 37
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
  5. 0 1
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
  6. 79 69
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
  7. 17 51
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
  8. 5 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  9. 3 15
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
  10. 24 1
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
  11. 129 139
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  12. 46 32
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  13. 1 1
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  14. 78 29
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
  15. 18 17
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  16. 7 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  17. 8 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
  18. 3 12
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  19. 44 38
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  20. 10 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
  21. 58 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BackupNodeProtocol.java
  22. 1 22
      hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
  23. 25 0
      hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
  24. 41 1
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
  25. 195 108
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
  26. 3 27
      hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

+ 1 - 0
hdfs/CHANGES.HDFS-1073.txt

@@ -67,3 +67,4 @@ HDFS-2102. Zero-pad edits filename to make them lexically sortable. (Ivan
 HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. (atm
            via todd)
 HDFS-2123. Checkpoint interval should be based on txn count, not size. (todd)
+HDFS-1979. Fix backupnode for new edits/image layout. (todd)

+ 249 - 269
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -19,31 +19,21 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
-import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Preconditions;
 
 /**
  * Extension of FSImage for the backup node.
@@ -52,30 +42,39 @@ import org.apache.hadoop.conf.Configuration;
  */
 @InterfaceAudience.Private
 public class BackupImage extends FSImage {
-  // Names of the journal spool directory and the spool file
-  private static final String STORAGE_JSPOOL_DIR = "jspool";
-  private static final String STORAGE_JSPOOL_FILE =
-    NNStorage.NameNodeFile.EDITS_NEW.getName();
-
   /** Backup input stream for loading edits into memory */
-  private EditLogBackupInputStream backupInputStream;
-
-  /** Is journal spooling in progress */
-  volatile JSpoolState jsState;
-  private long lastAppliedTxId = 0;
-
-  static enum JSpoolState {
-    OFF,
-    INPROGRESS,
-    WAIT;
+  private EditLogBackupInputStream backupInputStream =
+    new EditLogBackupInputStream("Data from remote NameNode");
+  
+  /**
+   * Current state of the BackupNode. The BackupNode's state
+   * transitions are as follows:
+   * 
+   * Initial: DROP_UNTIL_NEXT_ROLL
+   * - Transitions to JOURNAL_ONLY the next time the log rolls
+   * - Transitions to IN_SYNC in convergeJournalSpool
+   * - Transitions back to JOURNAL_ONLY if the log rolls while
+   *   stopApplyingOnNextRoll is true.
+   */
+  BNState bnState;
+  static enum BNState {
+    // Edits from the NN should be dropped. On the next log roll,
+    // transition to JOURNAL_ONLY state
+    DROP_UNTIL_NEXT_ROLL,
+    // Edits from the NN should be written to the local edits log
+    // but not applied to the namespace.
+    JOURNAL_ONLY,
+    // Edits should be written to the local edits log and applied
+    // to the local namespace.
+    IN_SYNC;
   }
 
-
   /**
-   * Place-holder for a txid that still needs to be addressed
-   * in HDFS-1073 branch before merging into trunk.
+   * Flag to indicate that the next time the NN rolls, the BN
+   * should transition from to JOURNAL_ONLY state.
+   * {@see #freezeNamespaceAtNextRoll()}
    */
-  private static final long TODO_TXID = 0xDEADBEEF;
+  private boolean stopApplyingEditsOnNextRoll = false;
 
   /**
    * Construct a backup image.
@@ -85,7 +84,8 @@ public class BackupImage extends FSImage {
   BackupImage(Configuration conf) throws IOException {
     super(conf);
     storage.setDisablePreUpgradableLayoutCheck(true);
-    jsState = JSpoolState.OFF;
+    bnState = BNState.DROP_UNTIL_NEXT_ROLL;
+    editLog.initJournals();
   }
 
   /**
@@ -130,279 +130,259 @@ public class BackupImage extends FSImage {
   }
 
   /**
-   * Reset storage directories.
-   * <p>
-   * Unlock the storage.
-   * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
-   * and recreate empty <code>current</code>.
-   * @throws IOException
+   * Save meta-data into fsimage files.
+   * and create empty edits.
    */
-  synchronized void reset() throws IOException {
-    /* TODO: BackupNode
-    // reset NameSpace tree
-    FSDirectory fsDir = getFSNamesystem().dir;
-    fsDir.reset();
-
-    // unlock, close and rename storage directories
-    storage.unlockAll();
-    
-    // recover from unsuccessful checkpoint if necessary
-    recoverCreateRead();
-    // rename and recreate
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      // rename current to lastcheckpoint.tmp
-      storage.moveCurrent(sd);
-    }
-    */
+  void saveCheckpoint() throws IOException {
+    saveNamespace();
   }
 
   /**
-   * Load checkpoint from local files only if the memory state is empty.<br>
-   * Set new checkpoint time received from the name-node.<br>
-   * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+   * Receive a batch of edits from the NameNode.
+   * 
+   * Depending on bnState, different actions are taken. See
+   * {@link BackupImage.BNState}
+   * 
+   * @param firstTxId first txid in batch
+   * @param numTxns number of transactions
+   * @param data serialized journal records.
    * @throws IOException
+   * @see #convergeJournalSpool()
    */
-  void loadCheckpoint(CheckpointSignature sig) throws IOException {
-    // load current image and journal if it is not in memory already
-    if(!editLog.isOpen())
-      editLog.startLogSegment(TODO_TXID);
-
-    // set storage fields
-    storage.setStorageInfo(sig);
-
-    FSDirectory fsDir = getFSNamesystem().dir;
-    if(fsDir.isEmpty()) {
-      Iterator<StorageDirectory> itImage
-        = storage.dirIterator(NameNodeDirType.IMAGE);
-      Iterator<StorageDirectory> itEdits
-        = storage.dirIterator(NameNodeDirType.EDITS);
-      if(!itImage.hasNext() || ! itEdits.hasNext())
-        throw new IOException("Could not locate checkpoint directories");
-      StorageDirectory sdName = itImage.next();
-      StorageDirectory sdEdits = itEdits.next();
+  synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Got journal, " +
+          "state = " + bnState +
+          "; firstTxId = " + firstTxId +
+          "; numTxns = " + numTxns);
+    }
+    
+    switch(bnState) {
+      case DROP_UNTIL_NEXT_ROLL:
+        return;
 
-      getFSDirectoryRootLock().writeLock();
-      try { // load image under rootDir lock
-        File imageFile = null; // TODO
-        MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
-        loadFSImage(imageFile, expectedMD5);
-      } finally {
-        getFSDirectoryRootLock().writeUnlock();
-      }
-      List<File> editsFiles =
-        FSImageOldStorageInspector.getEditsInStorageDir(sdEdits);
-      loadEdits(editsFiles);
-      lastAppliedTxId = getEditLog().getLastWrittenTxId();
+      case IN_SYNC:
+        // update NameSpace in memory
+        applyEdits(firstTxId, numTxns, data);
+        break;
+      
+      case JOURNAL_ONLY:
+        break;
+      
+      default:
+        throw new AssertionError("Unhandled state: " + bnState);
     }
+    
+    // write to BN's local edit log.
+    logEditsLocally(firstTxId, numTxns, data);
   }
 
   /**
-   * Save meta-data into fsimage files.
-   * and create empty edits.
+   * Write the batch of edits to the local copy of the edit logs.
    */
-  void saveCheckpoint() throws IOException {
-    saveNamespace();
-  }
-
-  private FSDirectory getFSDirectoryRootLock() {
-    return getFSNamesystem().dir;
-  }
-
-  static File getJSpoolDir(StorageDirectory sd) {
-    return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
-  }
-
-  static File getJSpoolFile(StorageDirectory sd) {
-    return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+  private void logEditsLocally(long firstTxId, int numTxns, byte[] data) {
+    long expectedTxId = editLog.getLastWrittenTxId() + 1;
+    Preconditions.checkState(firstTxId == expectedTxId,
+        "received txid batch starting at %s but expected txn %s",
+        firstTxId, expectedTxId);
+    editLog.setNextTxId(firstTxId + numTxns - 1);
+    editLog.logEdit(data.length, data);
+    editLog.logSync();
   }
 
   /**
-   * Journal writer journals new meta-data state.
-   * <ol>
-   * <li> If Journal Spool state is OFF then journal records (edits)
-   * are applied directly to meta-data state in memory and are written
-   * to the edits file(s).</li>
-   * <li> If Journal Spool state is INPROGRESS then records are only
-   * written to edits.new file, which is called Spooling.</li>
-   * <li> Journal Spool state WAIT blocks journaling until the
-   * Journal Spool reader finalizes merging of the spooled data and
-   * switches to applying journal to memory.</li>
-   * </ol>
-   * @param length length of data.
-   * @param data serialized journal records.
-   * @throws IOException
-   * @see #convergeJournalSpool()
+   * Apply the batch of edits to the local namespace.
    */
-  synchronized void journal(int length, byte[] data) throws IOException {
+  private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
+      throws IOException {
+    Preconditions.checkArgument(firstTxId == lastAppliedTxId + 1,
+        "Received txn batch starting at %s but expected %s",
+        firstTxId, lastAppliedTxId + 1);
     assert backupInputStream.length() == 0 : "backup input stream is not empty";
     try {
-      switch(jsState) {
-        case WAIT:
-        case OFF:
-          // wait until spooling is off
-          waitSpoolEnd();
-          // update NameSpace in memory
-          backupInputStream.setBytes(data);
-          FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          int logVersion = storage.getLayoutVersion();
-          BufferedInputStream bin = new BufferedInputStream(backupInputStream);
-          DataInputStream in = new DataInputStream(bin);
-          Checksum checksum = null;
-          if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-            checksum = FSEditLog.getChecksum();
-            in = new DataInputStream(new CheckedInputStream(bin, checksum));
-          }
-          logLoader.loadEditRecords(logVersion, in, checksum, true,
-                                    lastAppliedTxId + 1);
-          getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
-          break;
-        case INPROGRESS:
-          break;
+      if (LOG.isTraceEnabled()) {
+        LOG.debug("data:" + StringUtils.byteToHexString(data));
+      }
+      backupInputStream.setBytes(data);
+      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      int logVersion = storage.getLayoutVersion();
+      BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+      DataInputStream in = new DataInputStream(bin);
+      Checksum checksum = FSEditLog.getChecksum();
+      int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true,
+                                lastAppliedTxId + 1);
+      if (numLoaded != numTxns) {
+        throw new IOException("Batch of txns starting at txnid " +
+            firstTxId + " was supposed to contain " + numTxns +
+            " transactions but only was able to apply " + numLoaded);
       }
-      // write to files
-      editLog.logEdit(length, data);
-      editLog.logSync();
+      lastAppliedTxId += numTxns;
+      
+      getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
     } finally {
       backupInputStream.clear();
     }
   }
 
-  private synchronized void waitSpoolEnd() {
-    while(jsState == JSpoolState.WAIT) {
+  /**
+   * Transition the BackupNode from JOURNAL_ONLY state to IN_SYNC state.
+   * This is done by repeated invocations of tryConvergeJournalSpool until
+   * we are caught up to the latest in-progress edits file.
+   */
+  void convergeJournalSpool() throws IOException {
+    Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+        "bad state: %s", bnState);
+
+    while (!tryConvergeJournalSpool()) {
+      ;
+    }
+    assert bnState == BNState.IN_SYNC;
+  }
+  
+  private boolean tryConvergeJournalSpool() throws IOException {
+    Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+        "bad state: %s", bnState);
+    
+    // This section is unsynchronized so we can continue to apply
+    // ahead of where we're reading, concurrently. Since the state
+    // is JOURNAL_ONLY at this point, we know that lastAppliedTxId
+    // doesn't change, and curSegmentTxId only increases
+
+    while (lastAppliedTxId < editLog.getCurSegmentTxId() - 1) {
+      long target = editLog.getCurSegmentTxId();
+      LOG.info("Loading edits into backupnode to try to catch up from txid "
+          + lastAppliedTxId + " to " + target);
+      FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+      
+      storage.inspectStorageDirs(inspector);
+      LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
+          target - 1);
+  
+      logLoadPlan.doRecovery();
+      loadEdits(logLoadPlan.getEditsFiles());
+    }
+    
+    // now, need to load the in-progress file
+    synchronized (this) {
+      if (lastAppliedTxId != editLog.getCurSegmentTxId() - 1) {
+        LOG.debug("Logs rolled while catching up to current segment");
+        return false; // drop lock and try again to load local logs
+      }
+      
+      EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
       try {
-        wait();
-      } catch (InterruptedException  e) {}
+        long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
+        
+        LOG.info("Going to finish converging with remaining " + remainingTxns
+            + " txns from in-progress stream " + stream);
+        
+        FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+        int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
+        lastAppliedTxId += numLoaded;
+        assert numLoaded == remainingTxns :
+          "expected to load " + remainingTxns + " but loaded " +
+          numLoaded + " from " + stream;
+      } finally {
+        IOUtils.closeStream(stream);
+      }
+
+      LOG.info("Successfully synced BackupNode with NameNode at txnid " +
+          lastAppliedTxId);
+      setState(BNState.IN_SYNC);
     }
-    // now spooling should be off, verifying just in case
-    assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+    return true;
   }
 
   /**
-   * Start journal spool.
-   * Switch to writing into edits.new instead of edits.
-   *
-   * edits.new for spooling is in separate directory "spool" rather than in
-   * "current" because the two directories should be independent.
-   * While spooling a checkpoint can happen and current will first
-   * move to lastcheckpoint.tmp and then to previous.checkpoint
-   * spool/edits.new will remain in place during that.
+   * Transition edit log to a new state, logging as necessary.
    */
-  synchronized void startJournalSpool(NamenodeRegistration nnReg)
-  throws IOException {
-    switch(jsState) {
-      case OFF:
-        break;
-      case INPROGRESS:
-        return;
-      case WAIT:
-        waitSpoolEnd();
+  private synchronized void setState(BNState newState) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("State transition " + bnState + " -> " + newState,
+          new Exception("trace"));
     }
+    bnState = newState;
+  }
 
-    // create journal spool directories
-    for (Iterator<StorageDirectory> it
-           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File jsDir = getJSpoolDir(sd);
-      if (!jsDir.exists() && !jsDir.mkdirs()) {
-        throw new IOException("Mkdirs failed to create "
-                              + jsDir.getCanonicalPath());
+  /**
+   * Receive a notification that the NameNode has begun a new edit log.
+   * This causes the BN to also start the new edit log in its local
+   * directories.
+   */
+  synchronized void namenodeStartedLogSegment(long txid) {
+    LOG.info("NameNode started a new log segment at txid " + txid);
+    if (editLog.isOpen()) {
+      if (editLog.getLastWrittenTxId() == txid - 1) {
+        // We are in sync with the NN, so end and finalize the current segment
+        editLog.endCurrentLogSegment(false);
+      } else {
+        // We appear to have missed some transactions -- the NN probably
+        // lost contact with us temporarily. So, mark the current segment
+        // as aborted.
+        LOG.warn("NN started new log segment at txid " + txid +
+            ", but BN had only written up to txid " +
+            editLog.getLastWrittenTxId() +
+            "in the log segment starting at " + 
+        		editLog.getCurSegmentTxId() + ". Aborting this " +
+        		"log segment.");
+        editLog.abortCurrentLogSegment();
       }
-      // create edit file if missing
-      /*File eFile = storage.getEditFile(sd); TODO
-      if(!eFile.exists()) {
-        editLog.createEditLogFile(eFile);
-      }*/
     }
-
-    if(!editLog.isOpen())
-      editLog.startLogSegment(TODO_TXID);
-
-    // create streams pointing to the journal spool files
-    // subsequent journal records will go directly to the spool
-// TODO    editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
-    // set up spooling
-    if(backupInputStream == null)
-      backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
-    jsState = JSpoolState.INPROGRESS;
+    editLog.setNextTxId(txid);
+    editLog.startLogSegment(txid, false);
+    if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
+      setState(BNState.JOURNAL_ONLY);
+    }
+    
+    if (stopApplyingEditsOnNextRoll) {
+      if (bnState == BNState.IN_SYNC) {
+        LOG.info("Stopped applying edits to prepare for checkpoint.");
+        setState(BNState.JOURNAL_ONLY);
+      }
+      stopApplyingEditsOnNextRoll = false;
+      notifyAll();
+    }
   }
 
   /**
-   * Merge Journal Spool to memory.<p>
-   * Journal Spool reader reads journal records from edits.new.
-   * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
-   * This blocks journaling (see {@link #journal(int,byte[])}.
-   * The reader
-   * <ul>
-   * <li> reads remaining journal records if any,</li>
-   * <li> renames edits.new to edits,</li>
-   * <li> sets {@link JSpoolState} to OFF,</li>
-   * <li> and notifies the journaling thread.</li>
-   * </ul>
-   * Journaling resumes with applying new journal records to the memory state,
-   * and writing them into edits file(s).
+   * Request that the next time the BN receives a log roll, it should
+   * stop applying the edits log to the local namespace. This is
+   * typically followed on by a call to {@link #waitUntilNamespaceFrozen()}
    */
-  void convergeJournalSpool() throws IOException {
-    Iterator<StorageDirectory> itEdits
-      = storage.dirIterator(NameNodeDirType.EDITS);
-    if(! itEdits.hasNext())
-      throw new IOException("Could not locate checkpoint directories");
-    StorageDirectory sdEdits = itEdits.next();
-    int numEdits = 0;
-    File jSpoolFile = getJSpoolFile(sdEdits);
-    long startTime = now();
-    if(jSpoolFile.exists()) {
-      // load edits.new
-      EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
-      BufferedInputStream bin = new BufferedInputStream(edits);
-      DataInputStream in = new DataInputStream(bin);
-      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-
-      LogHeader header = FSEditLogOp.LogHeader.read(in);
-      int loaded = logLoader.loadEditRecords(
-          header.logVersion, in, header.checksum, false,
-          lastAppliedTxId + 1);
-
-      lastAppliedTxId += loaded;
-      numEdits += loaded;
-
-      // first time reached the end of spool
-      jsState = JSpoolState.WAIT;
-      loaded = logLoader.loadEditRecords(
-          header.logVersion, in, header.checksum,
-          true, lastAppliedTxId + 1);
-      numEdits += loaded;
-      lastAppliedTxId += loaded;
-
-      getFSNamesystem().dir.updateCountForINodeWithQuota();
-      edits.close();
-    }
-
-    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
-        + " of size " + jSpoolFile.length() + " edits # " + numEdits
-        + " loaded in " + (now()-startTime)/1000 + " seconds.");
-
-    // rename spool edits.new to edits making it in sync with the active node
-    // subsequent journal records will go directly to edits
-    // TODO editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
-    // write version file
-    // TODO resetVersion(storage.getImageDigest());
+  synchronized void freezeNamespaceAtNextRoll() {
+    stopApplyingEditsOnNextRoll = true;
+  }
 
-    // wake up journal writer
-    synchronized(this) {
-      jsState = JSpoolState.OFF;
-      notifyAll();
+  /**
+   * After {@link #freezeNamespaceAtNextRoll()} has been called, wait until
+   * the BN receives notification of the next log roll.
+   */
+  synchronized void waitUntilNamespaceFrozen() throws IOException {
+    if (bnState != BNState.IN_SYNC) return;
+
+    LOG.info("Waiting until the NameNode rolls its edit logs in order " +
+        "to freeze the BackupNode namespace.");
+    while (bnState == BNState.IN_SYNC) {
+      Preconditions.checkState(stopApplyingEditsOnNextRoll,
+        "If still in sync, we should still have the flag set to " +
+        "freeze at next roll");
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted waiting for namespace to freeze", ie);
+        throw new IOException(ie);
+      }
     }
+    LOG.info("BackupNode namespace frozen.");
+  }
 
-    /*
-     * TODO: bn
-    // Rename lastcheckpoint.tmp to previous.checkpoint
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      storage.moveLastCheckpoint(sd);
-    }
-    */
+  /**
+   * Override close() so that we don't finalize edit logs.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    editLog.abortCurrentLogSegment();
+    storage.close();
   }
 }

+ 65 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageArchivalManager.StorageArchiver;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+
+public class BackupJournalManager implements JournalManager {
+
+  private final NamenodeRegistration nnReg;
+  private final NamenodeRegistration bnReg;
+  
+  BackupJournalManager(NamenodeRegistration bnReg,
+      NamenodeRegistration nnReg) {
+    this.bnReg = bnReg;
+    this.nnReg = nnReg;
+  }
+
+  @Override
+  public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+    stm.startLogSegment(txId);
+    return stm;
+  }
+
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+  }
+
+  @Override
+  public void archiveLogsOlderThan(long minTxIdToKeep, StorageArchiver archiver)
+      throws IOException {
+  }
+
+  public boolean matchesRegistration(NamenodeRegistration bnReg) {
+    return bnReg.getAddress().equals(this.bnReg.getAddress());
+  }
+
+  @Override
+  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
+    return null;
+  }
+}

+ 48 - 37
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -51,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
  * </ol>
  */
 @InterfaceAudience.Private
-public class BackupNode extends NameNode {
+public class BackupNode extends NameNode implements BackupNodeProtocol {
   private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
   private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
   private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -176,6 +177,17 @@ public class BackupNode extends NameNode {
     super.stop();
   }
 
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (protocol.equals(BackupNodeProtocol.class.getName())) {
+      return BackupNodeProtocol.versionID;
+    } else {
+      return super.getProtocolVersion(protocol, clientVersion);
+    }
+  }
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol implementation for backup node.
   /////////////////////////////////////////////////////
@@ -202,30 +214,36 @@ public class BackupNode extends NameNode {
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
     throw new UnsupportedActionException("endCheckpoint");
-  }
+  }  
 
-  @Override // NamenodeProtocol
+  /////////////////////////////////////////////////////
+  // BackupNodeProtocol implementation for backup node.
+  /////////////////////////////////////////////////////
+
+  @Override
   public void journal(NamenodeRegistration nnReg,
-                      int jAction,
-                      int length,
-                      byte[] args) throws IOException {
+      long firstTxId, int numTxns,
+      byte[] records) throws IOException {
     verifyRequest(nnReg);
     if(!nnRpcAddress.equals(nnReg.getAddress()))
       throw new IOException("Journal request from unexpected name-node: "
           + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    BackupImage bnImage = (BackupImage)getFSImage();
-    switch(jAction) {
-      case (int)JA_IS_ALIVE:
-        return;
-      case (int)JA_JOURNAL:
-        bnImage.journal(length, args);
-        return;
-      case (int)JA_JSPOOL_START:
-        bnImage.startJournalSpool(nnReg);
-        return;
-      default:
-        throw new IOException("Unexpected journal action: " + jAction);
-    }
+    getBNImage().journal(firstTxId, numTxns, records);
+  }
+
+  @Override
+  public void startLogSegment(NamenodeRegistration registration, long txid)
+      throws IOException {
+    verifyRequest(registration);
+  
+    getBNImage().namenodeStartedLogSegment(txid);
+  }
+
+  //////////////////////////////////////////////////////
+  
+  
+  BackupImage getBNImage() {
+    return (BackupImage)getFSImage();
   }
 
   boolean shouldCheckpointAtStartup() {
@@ -234,9 +252,9 @@ public class BackupNode extends NameNode {
       assert fsImage.getStorage().getNumStorageDirs() > 0;
       return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
     }
-    if(namesystem == null || namesystem.dir == null || getFSImage() == null)
-      return true;
-    return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
+    
+    // BN always checkpoints on startup in order to get in sync with namespace
+    return true;
   }
 
   private NamespaceInfo handshake(Configuration conf) throws IOException {
@@ -287,14 +305,15 @@ public class BackupNode extends NameNode {
    */
   private void registerWith(NamespaceInfo nsInfo) throws IOException {
     BackupImage bnImage = (BackupImage)getFSImage();
+    NNStorage storage = bnImage.getStorage();
     // verify namespaceID
-    if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
-      bnImage.getStorage().setStorageInfo(nsInfo);
-    else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs"
-          + ": active node namespaceID = " + nsInfo.getNamespaceID() 
-          + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
-
+    if (storage.getNamespaceID() == 0) { // new backup storage
+      storage.setStorageInfo(nsInfo);
+      storage.setBlockPoolID(nsInfo.getBlockPoolID());
+      storage.setClusterID(nsInfo.getClusterID());
+    } else {
+      nsInfo.validateStorage(storage);
+    }
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {
@@ -323,14 +342,6 @@ public class BackupNode extends NameNode {
     nnRpcAddress = nnReg.getAddress();
   }
 
-  /**
-   * Reset node namespace state in memory and in storage directories.
-   * @throws IOException
-   */
-  void resetNamespace() throws IOException {
-    ((BackupImage)getFSImage()).reset();
-  }
-
   // TODO: move to a common with DataNode util class
   private static NamespaceInfo handshake(NamenodeProtocol namenode)
   throws IOException, SocketTimeoutException {

+ 0 - 1
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java

@@ -110,7 +110,6 @@ public class CheckpointSignature extends StorageInfo
        || cTime != si.getStorage().cTime
        || !clusterID.equals(si.getClusterID())
        || !blockpoolID.equals(si.getBlockPoolID())) {
-      // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime

+ 79 - 69
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java

@@ -17,30 +17,27 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.IOException;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.Daemon;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
 
 /**
  * The Checkpointer is responsible for supporting periodic checkpoints 
@@ -80,6 +77,7 @@ class Checkpointer extends Daemon {
     try {
       initialize(conf);
     } catch(IOException e) {
+      LOG.warn("Checkpointer got exception", e);
       shutdown();
       throw e;
     }
@@ -134,8 +132,9 @@ class Checkpointer extends Daemon {
     periodMSec *= 1000;
 
     long lastCheckpointTime = 0;
-    if(!backupNode.shouldCheckpointAtStartup())
+    if (!backupNode.shouldCheckpointAtStartup()) {
       lastCheckpointTime = now();
+    }
     while(shouldRun) {
       try {
         long now = now();
@@ -174,45 +173,16 @@ class Checkpointer extends Daemon {
     return uncheckpointedTxns;
   }
 
-  /**
-   * Download <code>fsimage</code> and <code>edits</code>
-   * files from the remote name-node.
-   */
-  private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
-    String nnHttpAddr = backupNode.nnHttpAddress;
-
-    // Retrieve image file
-    MD5Hash hash = TransferFsImage.downloadImageToStorage(
-        nnHttpAddr, sig.mostRecentCheckpointTxId,
-        getFSImage().getStorage(), true);
-    getFSImage().saveDigestAndRenameCheckpointImage(sig.mostRecentCheckpointTxId, hash);
-    
-    // Retrieve edits file
-    // TODO!
-  }
-
-  /**
-   * Copy the new image into remote name-node.
-   */
-  private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
-    // TODO: checkpoint node disabled in 1073 branch
-/*    // Use the exact http addr as specified in config to deal with ip aliasing
-    InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
-    int httpPort = httpSocAddr.getPort();
-    String fileid = "putimage=1&port=" + httpPort +
-      "&machine=" + infoBindAddress +
-      "&token=" + sig.toString();
-    LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
-    TransferFsImage.getFileClient(backupNode.nnHttpAddress, 
-        fileid, null, false);
-        */
-  }
-
   /**
    * Create a new checkpoint
    */
   void doCheckpoint() throws IOException {
+    BackupImage bnImage = getFSImage();
+    NNStorage bnStorage = bnImage.getStorage();
+
     long startTime = now();
+    bnImage.freezeNamespaceAtNextRoll();
+    
     NamenodeCommand cmd = 
       getNamenode().startCheckpoint(backupNode.getRegistration());
     CheckpointCommand cpCmd = null;
@@ -228,36 +198,76 @@ class Checkpointer extends Daemon {
         throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
     }
 
+    bnImage.waitUntilNamespaceFrozen();
+    
     CheckpointSignature sig = cpCmd.getSignature();
-    assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
-      "Signature should have current layout version. Expected: "
-      + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
-    assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
-      cpCmd.isImageObsolete() : "checkpoint node should always download image.";
-    if(cpCmd.isImageObsolete()) {
-      // First reset storage on disk and memory state
-      backupNode.resetNamespace();
-      downloadCheckpoint(sig);
-    }
 
-    BackupImage bnImage = getFSImage();
-    bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId());
-    bnImage.getStorage().setClusterID(backupNode.getClusterId());
-    bnImage.loadCheckpoint(sig);
+    // Make sure we're talking to the same NN!
     sig.validateStorageInfo(bnImage);
-    bnImage.saveCheckpoint();
 
-    if(cpCmd.needToReturnImage())
-      uploadCheckpoint(sig);
+    long lastApplied = bnImage.getLastAppliedTxId();
+    LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
+    RemoteEditLogManifest manifest =
+      getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
+
+    if (!manifest.getLogs().isEmpty()) {
+      RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
+      // we don't have enough logs to roll forward using only logs. Need
+      // to download and load the image.
+      if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+        LOG.info("Unable to roll forward using only logs. Downloading " +
+            "image with txid " + sig.mostRecentCheckpointTxId);
+        MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
+            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
+            bnStorage, true);
+        bnImage.saveDigestAndRenameCheckpointImage(
+            sig.mostRecentCheckpointTxId, downloadedHash);
+        
+        LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
+        File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
+        bnImage.reloadFromImageFile(file);
+      }
+      
+      lastApplied = bnImage.getLastAppliedTxId();
+      if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+        throw new IOException("No logs to roll forward from " + lastApplied);
+      }
+  
+      // get edits files
+      for (RemoteEditLog log : manifest.getLogs()) {
+        TransferFsImage.downloadEditsToStorage(
+            backupNode.nnHttpAddress, log, bnStorage);
+      }
+  
+      SecondaryNameNode.rollForwardByApplyingLogs(manifest, bnImage);
+    }
+
+    long txid = bnImage.getLastAppliedTxId();
+    bnImage.saveFSImageInAllDirs(txid);
+    bnStorage.writeAll();
+
+    if(cpCmd.needToReturnImage()) {
+      TransferFsImage.uploadImageFromStorage(
+          backupNode.nnHttpAddress, getImageListenAddress(),
+          bnStorage, txid);
+    }
 
     getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
 
-    bnImage.convergeJournalSpool();
+    if (backupNode.getRole() == NamenodeRole.BACKUP) {
+      bnImage.convergeJournalSpool();
+    }
     backupNode.setRegistration(); // keep registration up to date
-    if(backupNode.isRole(NamenodeRole.CHECKPOINT))
-        getFSImage().getEditLog().close();
+    
+    long imageSize = bnImage.getStorage().getFsImageName(txid).length();
     LOG.info("Checkpoint completed in "
         + (now() - startTime)/1000 + " seconds."
-        + " New Image Size: " + bnImage.getStorage().getFsImageName(0 /* TODO */).length());
+        + " New Image Size: " + imageSize);
+  }
+
+  private InetSocketAddress getImageListenAddress() {
+    InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+    int httpPort = httpSocAddr.getPort();
+    return new InetSocketAddress(infoBindAddress, httpPort);
   }
 }

+ 17 - 51
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
@@ -42,7 +42,7 @@ import org.apache.hadoop.net.NetUtils;
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private NamenodeProtocol backupNode;          // RPC proxy to backup node
+  private BackupNodeProtocol backupNode;        // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private ArrayList<JournalRecord> bufCurrent;  // current buffer for writing
@@ -60,13 +60,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
       this.args = writables;
     }
 
-    void write(DataOutputStream out) throws IOException {
-      out.write(op);
-      out.writeLong(txid);
-      if(args == null)
-        return;
-      for(Writable w : args)
-        w.write(out);
+    void write(DataOutputBuffer out) throws IOException {
+      writeChecksummedOp(out, op, txid, args);
     }
   }
 
@@ -81,8 +76,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
-        (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
+        (BackupNodeProtocol) RPC.getProxy(BackupNodeProtocol.class,
+            BackupNodeProtocol.versionID, bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
@@ -91,7 +86,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     this.bufReady = new ArrayList<JournalRecord>();
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
-
+  
   @Override // JournalStream
   public String getName() {
     return bnRegistration.getAddress();
@@ -156,23 +151,14 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
   @Override // EditLogOutputStream
   protected void flushAndSync() throws IOException {
     assert out.size() == 0 : "Output buffer is not empty";
-    int bufReadySize = bufReady.size();
-    for(int idx = 0; idx < bufReadySize; idx++) {
-      JournalRecord jRec = null;
-      for(; idx < bufReadySize; idx++) {
-        jRec = bufReady.get(idx);
-        if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
-          break;  // special operation should be sent in a separate call to BN
-        jRec.write(out);
-      }
-      if(out.size() > 0)
-        send(NamenodeProtocol.JA_JOURNAL);
-      if(idx == bufReadySize)
-        break;
-      // operation like start journal spool or increment checkpoint time
-      // is a separate call to BN
+    for (JournalRecord jRec : bufReady) {
       jRec.write(out);
-      send(jRec.op);
+    }
+    if (out.size() > 0) {
+      byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+      backupNode.journal(nnRegistration,
+          bufReady.get(0).txid, bufReady.size(),
+          data);
     }
     bufReady.clear();         // erase all data in the buffer
     out.reset();              // reset buffer to the start position
@@ -188,16 +174,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     return 0;
   }
 
-  private void send(int ja) throws IOException {
-    try {
-      int length = out.getLength();
-      out.write(FSEditLogOpCodes.OP_INVALID.getOpCode());
-      backupNode.journal(nnRegistration, ja, length, out.getData());
-    } finally {
-      out.reset();
-    }
-  }
-
   /**
    * Get backup node registration.
    */
@@ -205,17 +181,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
     return bnRegistration;
   }
 
-  /**
-   * Verify that the backup node is alive.
-   */
-  boolean isAlive() {
-    try {
-      send(NamenodeProtocol.JA_IS_ALIVE);
-    } catch(IOException ei) {
-      Storage.LOG.info(bnRegistration.getRole() + " "
-                      + bnRegistration.getAddress() + " is not alive. ", ei);
-      return false;
-    }
-    return true;
+  void startLogSegment(long txId) throws IOException {
+    backupNode.startLogSegment(nnRegistration, txId);
   }
 }

+ 5 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -72,5 +72,10 @@ class EditLogFileInputStream extends EditLogInputStream {
     // file size + size of both buffers
     return file.length();
   }
+  
+  @Override
+  public String toString() {
+    return getName();
+  }
 
 }

+ 3 - 15
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java

@@ -111,21 +111,9 @@ class EditLogFileOutputStream extends EditLogOutputStream {
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    int start = bufCurrent.getLength();
-    write(op);
-    bufCurrent.writeLong(txid);
-    for (Writable w : writables) {
-      w.write(bufCurrent);
-    }
-    // write transaction checksum
-    int end = bufCurrent.getLength();
-    Checksum checksum = FSEditLog.getChecksum();
-    checksum.reset();
-    checksum.update(bufCurrent.getData(), start, end-start);
-    int sum = (int)checksum.getValue();
-    bufCurrent.writeInt(sum);
+    writeChecksummedOp(bufCurrent, op, txid, writables);
   }
-  
+
   @Override
   void write(final byte[] data, int off, int len) throws IOException {
     bufCurrent.write(data, off, len);
@@ -151,7 +139,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
 
     setReadyToFlush();
     flush();
-    
+
     // close should have been called after all pending transactions
     // have been flushed & synced.
     // if already closed, just skip

+ 24 - 1
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java

@@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.zip.Checksum;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -135,5 +138,25 @@ implements JournalStream {
     return getName();
   }
 
-
+  /**
+   * Write the given operation to the specified buffer, including
+   * the transaction ID and checksum.
+   */
+  protected static void writeChecksummedOp(
+      DataOutputBuffer buf, byte op, long txid, Writable... writables)
+      throws IOException {
+    int start = buf.getLength();
+    buf.write(op);
+    buf.writeLong(txid);
+    for (Writable w : writables) {
+      w.write(buf);
+    }
+    // write transaction checksum
+    int end = buf.getLength();
+    Checksum checksum = FSEditLog.getChecksum();
+    checksum.reset();
+    checksum.update(buf.getData(), start, end-start);
+    int sum = (int)checksum.getValue();
+    buf.writeInt(sum);
+  }
 }

+ 129 - 139
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
 
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -151,7 +153,7 @@ public class FSEditLog  {
   /**
    * Initialize the list of edit journals
    */
-  private void initJournals() {
+  synchronized void initJournals() {
     assert journals.isEmpty();
     Preconditions.checkState(state == State.UNINITIALIZED,
         "Bad state: %s", state);
@@ -167,10 +169,6 @@ public class FSEditLog  {
     state = State.BETWEEN_LOG_SEGMENTS;
   }
   
-  private int getNumEditsDirs() {
-   return storage.getNumStorageDirs(NameNodeDirType.EDITS);
-  }
-
   /**
    * Initialize the output stream for logging, opening the first
    * log segment.
@@ -179,7 +177,7 @@ public class FSEditLog  {
     Preconditions.checkState(state == State.UNINITIALIZED);
     initJournals();
 
-    startLogSegment(getLastWrittenTxId() + 1);
+    startLogSegment(getLastWrittenTxId() + 1, true);
     assert state == State.IN_SEGMENT : "Bad state: " + state;
   }
   
@@ -199,7 +197,7 @@ public class FSEditLog  {
     if (state == State.IN_SEGMENT) {
       assert !journals.isEmpty();
       waitForSyncToFinish();
-      endCurrentLogSegment();
+      endCurrentLogSegment(true);
     }
 
     state = State.CLOSED;
@@ -335,11 +333,12 @@ public class FSEditLog  {
    * Set the transaction ID to use for the next transaction written.
    */
   synchronized void setNextTxId(long nextTxId) {
-    assert synctxid <= txid &&
-       nextTxId >= txid : "May not decrease txid." +
-      " synctxid=" + synctxid +
-      " txid=" + txid +
-      " nextTxid=" + nextTxId;
+    Preconditions.checkArgument(synctxid <= txid &&
+       nextTxId >= txid,
+       "May not decrease txid." +
+      " synctxid=%s txid=%s nextTxId=%s",
+      synctxid, txid, nextTxId);
+      
     txid = nextTxId - 1;
   }
     
@@ -470,7 +469,8 @@ public class FSEditLog  {
       
       if (badJournals.size() >= journals.size()) {
         LOG.fatal("Could not sync any journal to persistent storage. " +
-            "Unsynced transactions: " + (txid - synctxid));
+            "Unsynced transactions: " + (txid - synctxid),
+            new Exception());
         runtime.exit(1);
       }
     } finally {
@@ -725,36 +725,6 @@ public class FSEditLog  {
     return new DeprecatedUTF8(Long.toString(timestamp));
   }
 
-  /**
-   * Return the size of the current EditLog
-   */
-  // TODO who uses this, does it make sense with transactions?
-  synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= journals.size() :
-        "Number of edits directories should not exceed the number of streams.";
-    long size = -1;
-        
-    List<JournalAndStream> badJournals = Lists.newArrayList();
-    
-    for (JournalAndStream j : journals) {
-      if (!j.isActive()) continue;
-      EditLogOutputStream es = j.getCurrentStream();
-      try {
-        long curSize = es.length();
-        assert (size == 0 || size == curSize || curSize ==0) :
-          "Wrong streams size";
-        size = Math.max(size, curSize);
-      } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
-        badJournals.add(j);
-      }
-    }
-    disableAndReportErrorOnJournals(badJournals);
-    
-    assert size != -1;
-    return size;
-  }
-  
   /**
    * Used only by unit tests.
    */
@@ -793,10 +763,10 @@ public class FSEditLog  {
    */
   synchronized long rollEditLog() throws IOException {
     LOG.info("Rolling edit logs.");
-    endCurrentLogSegment();
+    endCurrentLogSegment(true);
     
     long nextTxId = getLastWrittenTxId() + 1;
-    startLogSegment(nextTxId);
+    startLogSegment(nextTxId, true);
     
     assert curSegmentTxId == nextTxId;
     return nextTxId;
@@ -806,14 +776,20 @@ public class FSEditLog  {
    * Start writing to the log segment with the given txid.
    * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. 
    */
-  synchronized void startLogSegment(final long txId) {
-    LOG.info("Starting log segment at " + txId);
+  synchronized void startLogSegment(final long segmentTxId,
+      boolean writeHeaderTxn) {
+    LOG.info("Starting log segment at " + segmentTxId);
+    Preconditions.checkArgument(segmentTxId > 0,
+        "Bad txid: %s", segmentTxId);
     Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
         "Bad state: %s", state);
-    Preconditions.checkState(txId > curSegmentTxId,
-        "Cannot start writing to log segment " + txId +
+    Preconditions.checkState(segmentTxId > curSegmentTxId,
+        "Cannot start writing to log segment " + segmentTxId +
         " when previous log segment started at " + curSegmentTxId);
-    curSegmentTxId = txId;
+    Preconditions.checkArgument(segmentTxId == txid + 1,
+        "Cannot start log segment at txid %s when next expected " +
+        "txid is %s", segmentTxId, txid + 1);
+    curSegmentTxId = segmentTxId;
     
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
@@ -823,26 +799,32 @@ public class FSEditLog  {
     mapJournalsAndReportErrors(new JournalClosure() {
       @Override
       public void apply(JournalAndStream jas) throws IOException {
-        jas.startLogSegment(txId);
+        jas.startLogSegment(segmentTxId);
       }
-    }, "starting log segment " + txId);
+    }, "starting log segment " + segmentTxId);
 
     state = State.IN_SEGMENT;
 
-    logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
-    logSync();    
+    if (writeHeaderTxn) {
+      logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
+      logSync();
+    }
   }
 
   /**
    * Finalize the current log segment.
    * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
    */
-  synchronized void endCurrentLogSegment() {
+  synchronized void endCurrentLogSegment(boolean writeEndTxn) {
     LOG.info("Ending log segment " + curSegmentTxId);
     Preconditions.checkState(state == State.IN_SEGMENT,
         "Bad state: %s", state);
-    logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
-    waitForSyncToFinish();
+    
+    if (writeEndTxn) {
+      logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+      logSync();
+    }
+
     printStatistics(true);
     
     final long lastTxId = getLastWrittenTxId();
@@ -858,6 +840,20 @@ public class FSEditLog  {
     
     state = State.BETWEEN_LOG_SEGMENTS;
   }
+  
+  /**
+   * Abort all current logs. Called from the backup node.
+   */
+  synchronized void abortCurrentLogSegment() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.abort();
+      }
+    }, "aborting all streams");
+    state = State.BETWEEN_LOG_SEGMENTS;
+  }
 
   /**
    * Archive any log files that are older than the given txid.
@@ -910,36 +906,62 @@ public class FSEditLog  {
   /**
    * Create (or find if already exists) an edit output stream, which
    * streams journal records (edits) to the specified backup node.<br>
-   * Send a record, prescribing to start journal spool.<br>
-   * This should be sent via regular stream of journal records so that
-   * the backup node new exactly after which record it should start spooling.
+   * 
+   * The new BackupNode will start receiving edits the next time this
+   * NameNode's logs roll.
    * 
    * @param bnReg the backup node registration information.
    * @param nnReg this (active) name-node registration.
    * @throws IOException
    */
-  synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
-                      NamenodeRegistration nnReg) // active name-node
+  synchronized void registerBackupNode(
+      NamenodeRegistration bnReg, // backup node
+      NamenodeRegistration nnReg) // active name-node
   throws IOException {
-    /*
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
-    if(editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>();
-    EditLogOutputStream boStream = null;
-    for(EditLogOutputStream eStream : editStreams) {
-      if(eStream.getName().equals(bnReg.getAddress())) {
-        boStream = eStream; // already there
-        break;
+    
+    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    if (jas != null) {
+      // already registered
+      LOG.info("Backup node " + bnReg + " re-registers");
+      return;
+    }
+    
+    LOG.info("Registering new backup node: " + bnReg);
+    BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
+    journals.add(new JournalAndStream(bjm));
+  }
+  
+  synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    for (Iterator<JournalAndStream> iter = journals.iterator();
+         iter.hasNext();) {
+      JournalAndStream jas = iter.next();
+      if (jas.manager instanceof BackupJournalManager &&
+          ((BackupJournalManager)jas.manager).matchesRegistration(
+              registration)) {
+        jas.abort();        
+        LOG.info("Removing backup journal " + jas);
+        iter.remove();
       }
     }
-    if(boStream == null) {
-      boStream = new EditLogBackupOutputStream(bnReg, nnReg);
-      editStreams.add(boStream);
+  }
+  
+  /**
+   * Find the JournalAndStream associated with this BackupNode.
+   * @return null if it cannot be found
+   */
+  private synchronized JournalAndStream findBackupJournalAndStream(
+      NamenodeRegistration bnReg) {
+    for (JournalAndStream jas : journals) {
+      if (jas.manager instanceof BackupJournalManager) {
+        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
+        if (bjm.matchesRegistration(bnReg)) {
+          return jas;
+        }
+      }
     }
-    logEdit(OP_JSPOOL_START, (Writable[])null);
-    TODO: backupnode is disabled
-    */
+    return null;
   }
 
   /**
@@ -961,62 +983,6 @@ public class FSEditLog  {
     endTransaction(start);
   }
 
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    /*
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    NamenodeRegistration backupNode = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-            backupNode.isRole(registration.getRole())) {
-        errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        break;
-      }
-    }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
-    TODO BN currently disabled
-    */
-  }
-
-  synchronized boolean checkBackupRegistration(
-      NamenodeRegistration registration) {
-    /*
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    boolean regAllowed = !it.hasNext();
-    NamenodeRegistration backupNode = null;
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-          backupNode.isRole(registration.getRole())) {
-        regAllowed = true; // same node re-registers
-        break;
-      }
-      if(!eStream.isAlive()) {
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        regAllowed = true; // previous backup node failed
-      }
-    }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
-    return regAllowed;
-    
-    TODO BN currently disabled
-    */
-    return false;
-  }
-  
   static BytesWritable toBytesWritable(Options.Rename... options) {
     byte[] bytes = new byte[options.length];
     for (int i = 0; i < options.length; i++) {
@@ -1061,12 +1027,7 @@ public class FSEditLog  {
  
     for (JournalAndStream j : badJournals) {
       LOG.error("Disabling journal " + j);
-      try {
-        j.abort();
-      } catch (IOException ioe) {
-        LOG.warn("Failed to abort faulty journal " + j
-                 + " before removing it (might be OK)", ioe);
-      }
+      j.abort();
     }
   }
 
@@ -1093,13 +1054,17 @@ public class FSEditLog  {
     }
 
     public void close(long lastTxId) throws IOException {
+      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
+          "invalid segment: lastTxId %s >= " +
+          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
+          
       if (stream == null) return;
       stream.close();
       manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
       stream = null;
     }
     
-    public void abort() throws IOException {
+    public void abort() {
       if (stream == null) return;
       try {
         stream.abort();
@@ -1133,5 +1098,30 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
+
+    public EditLogInputStream getInProgressInputStream() throws IOException {
+      return manager.getInProgressInputStream(segmentStartsAtTxId);
+    }
+  }
+
+  /**
+   * @return an EditLogInputStream that reads from the same log that
+   * the edit log is currently writing. This is used from the BackupNode
+   * during edits synchronization.
+   * @throws IOException if no valid logs are available.
+   */
+  synchronized EditLogInputStream getInProgressFileInputStream()
+      throws IOException {
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      try {
+        EditLogInputStream in = jas.getInProgressInputStream();
+        if (in != null) return in;
+      } catch (IOException ioe) {
+        LOG.warn("Unable to get the in-progress input stream from " + jas,
+            ioe);
+      }
+    }
+    throw new IOException("No in-progress stream provided edits");
   }
 }

+ 46 - 32
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -21,16 +21,13 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +42,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -74,14 +70,17 @@ import com.google.common.collect.Lists;
 public class FSImage implements Closeable {
   protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
-  // checkpoint states
-  enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
-
   protected FSNamesystem namesystem = null;
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
 
   protected NNStorage storage;
+  
+  /**
+   * The last transaction ID that was either loaded from an image
+   * or loaded by loading edits files.
+   */
+  protected long lastAppliedTxId = 0;
 
   /**
    * URIs for importing an image from a checkpoint. In the default case,
@@ -92,10 +91,6 @@ public class FSImage implements Closeable {
 
   final private Configuration conf;
 
-  /**
-   * Can fs-image be rolled?
-   */
-  volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START;
   private final NNStorageArchivalManager archivalManager; 
 
   /**
@@ -559,6 +554,18 @@ public class FSImage implements Closeable {
     editLog.open();
     storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
   };
+  
+  /**
+   * Toss the current image and namesystem, reloading from the specified
+   * file.
+   */
+  void reloadFromImageFile(File file) throws IOException {
+    // TODO: namesystem.close(); ??
+    namesystem.dir.reset();
+
+    LOG.debug("Reloading namespace from " + file);
+    loadFSImage(file);
+  }
 
   /**
    * Choose latest image from one of the directories,
@@ -626,8 +633,12 @@ public class FSImage implements Closeable {
     } catch (IOException ioe) {
       throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
     }
-
-    needToSave |= loadEdits(loadPlan.getEditsFiles());
+    
+    long numLoaded = loadEdits(loadPlan.getEditsFiles());
+    needToSave |= numLoaded > 0;
+    
+    // update the txid for the edit log
+    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
 
     /* TODO(todd) Need to discuss whether we should force a re-save
      * of the image if one of the edits or images has an old format
@@ -640,13 +651,14 @@ public class FSImage implements Closeable {
 
   /**
    * Load the specified list of edit files into the image.
-   * @return true if the image should be re-saved
+   * @return the number of transactions loaded
    */
-  protected boolean loadEdits(List<File> editLogs) throws IOException {
+  protected long loadEdits(List<File> editLogs) throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
-      
+
+    long startingTxId = getLastAppliedTxId() + 1;
+    
     FSEditLogLoader loader = new FSEditLogLoader(namesystem);
-    long startingTxId = storage.getMostRecentCheckpointTxId() + 1;
     int numLoaded = 0;
     // Load latest edits
     for (File edits : editLogs) {
@@ -655,17 +667,13 @@ public class FSImage implements Closeable {
       int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
       startingTxId += thisNumLoaded;
       numLoaded += thisNumLoaded;
+      lastAppliedTxId += thisNumLoaded;
       editIn.close();
     }
 
     // update the counts
     getFSNamesystem().dir.updateCountForINodeWithQuota();    
-    
-    // update the txid for the edit log
-    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
-
-    // If we loaded any edits, need to save.
-    return numLoaded > 0;
+    return numLoaded;
   }
 
 
@@ -673,7 +681,7 @@ public class FSImage implements Closeable {
    * Load the image namespace from the given image file, verifying
    * it against the MD5 sum stored in its associated .md5 file.
    */
-  void loadFSImage(File imageFile) throws IOException {
+  private void loadFSImage(File imageFile) throws IOException {
     MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
     if (expectedMD5 == null) {
       throw new IOException("No MD5 file found corresponding to image file "
@@ -687,7 +695,7 @@ public class FSImage implements Closeable {
    * filenames and blocks.  Return whether we should
    * "re-save" and consolidate the edit-logs
    */
-  void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
+  private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, getFSNamesystem());
     loader.load(curFile);
@@ -704,8 +712,9 @@ public class FSImage implements Closeable {
     }
 
     long txId = loader.getLoadedImageTxId();
+    LOG.info("Loaded image for txid " + txId + " from " + curFile);
+    lastAppliedTxId = txId;
     storage.setMostRecentCheckpointTxId(txId);
-    editLog.setNextTxId(txId + 1);
   }
 
 
@@ -718,10 +727,10 @@ public class FSImage implements Closeable {
     
     FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    saver.save(newFile, getFSNamesystem(), compression);
+    saver.save(newFile, txid, getFSNamesystem(), compression);
     
     MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
-    storage.setMostRecentCheckpointTxId(editLog.getLastWrittenTxId());
+    storage.setMostRecentCheckpointTxId(txid);
   }
 
   /**
@@ -784,7 +793,7 @@ public class FSImage implements Closeable {
     boolean editLogWasOpen = editLog.isOpen();
     
     if (editLogWasOpen) {
-      editLog.endCurrentLogSegment();
+      editLog.endCurrentLogSegment(true);
     }
     long imageTxId = editLog.getLastWrittenTxId();
     try {
@@ -793,7 +802,7 @@ public class FSImage implements Closeable {
       
     } finally {
       if (editLogWasOpen) {
-        editLog.startLogSegment(imageTxId + 1);
+        editLog.startLogSegment(imageTxId + 1, true);
         // Take this opportunity to note the current transaction
         storage.writeTransactionIdFileToStorage(imageTxId + 1);
       }
@@ -951,7 +960,6 @@ public class FSImage implements Closeable {
       // do not return image if there are no image directories
       needToReturnImg = false;
     CheckpointSignature sig = rollEditLog();
-    getEditLog().logJSpoolStart(bnReg, nnReg);
     return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
   }
 
@@ -1003,7 +1011,9 @@ public class FSImage implements Closeable {
   }
 
   synchronized public void close() throws IOException {
-    getEditLog().close();
+    if (editLog != null) { // 2NN doesn't have any edit log
+      getEditLog().close();
+    }
     storage.close();
   }
 
@@ -1054,4 +1064,8 @@ public class FSImage implements Closeable {
   public String getBlockPoolID() {
     return storage.getBlockPoolID();
   }
+
+  public synchronized long getLastAppliedTxId() {
+    return lastAppliedTxId;
+  }
 }

+ 1 - 1
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -539,6 +539,7 @@ class FSImageFormat {
     }
 
     void save(File newFile,
+              long txid,
               FSNamesystem sourceNamesystem,
               FSImageCompression compression)
       throws IOException {
@@ -559,7 +560,6 @@ class FSImageFormat {
                      .getStorage().getNamespaceID()); // TODO bad dependency
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
-        long txid = sourceNamesystem.getEditLog().getLastWrittenTxId();
         out.writeLong(txid);
 
         // write compression info and set up compressed stream

+ 78 - 29
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java

@@ -106,14 +106,15 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
                    "not configured to contain images.");
         }
       }
+    }
+    
 
-      // Check for a seen_txid file, which marks a minimum transaction ID that
-      // must be included in our load plan.
-      try {
-        maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
-      } catch (IOException ioe) {
-        LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
-      }
+    // Check for a seen_txid file, which marks a minimum transaction ID that
+    // must be included in our load plan.
+    try {
+      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    } catch (IOException ioe) {
+      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
     List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
@@ -215,14 +216,45 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
     }
 
     FoundFSImage recoveryImage = getLatestImage();
-    long expectedTxId = recoveryImage.txId + 1;
+    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
+
+    return new TransactionalLoadPlan(recoveryImage,
+        logPlan);
+  }
+  
+  /**
+   * Plan which logs to load in order to bring the namespace up-to-date.
+   * Transactions will be considered in the range (sinceTxId, maxTxId]
+   * 
+   * @param sinceTxId the highest txid that is already loaded 
+   *                  (eg from the image checkpoint)
+   * @param maxStartTxId ignore any log files that start after this txid
+   */
+  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
+    long expectedTxId = sinceTxId + 1;
     
     List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
     
-    SortedMap<Long, LogGroup> usefulGroups = logGroups.tailMap(expectedTxId);
-    LOG.debug("Excluded " + (logGroups.size() - usefulGroups.size()) + 
-        " groups of logs because they start with a txid less than image " +
-        "txid " + recoveryImage.txId);
+    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
+    if (logGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
+          " groups of logs because they start with a txid less than image " +
+          "txid " + sinceTxId);
+    }
+    
+    SortedMap<Long, LogGroup> usefulGroups;
+    if (maxStartTxId > sinceTxId) {
+      usefulGroups = tailGroups.headMap(maxStartTxId);
+    } else {
+      usefulGroups = new TreeMap<Long, LogGroup>();
+    }
+    
+    if (usefulGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
+        " groups of logs because they start with a txid higher than max " +
+        "txid " + sinceTxId);
+    }
+
 
     for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
       long logStartTxId = entry.getKey();
@@ -251,7 +283,7 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
     
     long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
         0 : usefulGroups.lastKey();
-    if (maxSeenTxId > recoveryImage.txId &&
+    if (maxSeenTxId > sinceTxId &&
         maxSeenTxId > lastLogGroupStartTxId) {
       String msg = "At least one storage directory indicated it has seen a " +
         "log segment starting at txid " + maxSeenTxId;
@@ -263,9 +295,10 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
       }
       throw new IOException(msg);
     }
-
-    return new TransactionalLoadPlan(recoveryImage, recoveryLogs,
+    
+    return new LogLoadPlan(recoveryLogs,
         Lists.newArrayList(usefulGroups.values()));
+
   }
 
   @Override
@@ -595,23 +628,18 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
 
   static class TransactionalLoadPlan extends LoadPlan {
     final FoundFSImage image;
-    final List<FoundEditLog> editLogs;
-    final List<LogGroup> logGroupsToRecover;
+    final LogLoadPlan logPlan;
     
     public TransactionalLoadPlan(FoundFSImage image,
-        List<FoundEditLog> editLogs,
-        List<LogGroup> logGroupsToRecover) {
+        LogLoadPlan logPlan) {
       super();
       this.image = image;
-      this.editLogs = editLogs;
-      this.logGroupsToRecover = logGroupsToRecover;
+      this.logPlan = logPlan;
     }
 
     @Override
     boolean doRecovery() throws IOException {
-      for (LogGroup g : logGroupsToRecover) {
-        g.recover();
-      }
+      logPlan.doRecovery();
       return false;
     }
 
@@ -622,11 +650,7 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
 
     @Override
     List<File> getEditsFiles() {
-      List<File> ret = new ArrayList<File>();
-      for (FoundEditLog log : editLogs) {
-        ret.add(log.getFile());
-      }
-      return ret;
+      return logPlan.getEditsFiles();
     }
 
     @Override
@@ -634,4 +658,29 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
       return image.sd;
     }
   }
+  
+  static class LogLoadPlan {
+    final List<FoundEditLog> editLogs;
+    final List<LogGroup> logGroupsToRecover;
+    
+    LogLoadPlan(List<FoundEditLog> editLogs,
+        List<LogGroup> logGroupsToRecover) {
+      this.editLogs = editLogs;
+      this.logGroupsToRecover = logGroupsToRecover;
+    }
+
+    public void doRecovery() throws IOException {
+      for (LogGroup g : logGroupsToRecover) {
+        g.recover();
+      }
+    }
+
+    public List<File> getEditsFiles() {
+      List<File> ret = new ArrayList<File>();
+      for (FoundEditLog log : editLogs) {
+        ret.add(log.getFile());
+      }
+      return ret;
+    }
+  }
 }

+ 18 - 17
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTar
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -5337,31 +5338,29 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
   }
 
   /**
-   * Register a name-node.
-   * <p>
-   * Registration is allowed if there is no ongoing streaming to
-   * another backup node.
-   * We currently allow only one backup node, but multiple chackpointers 
-   * if there are no backups.
+   * Register a Backup name-node, verifying that it belongs
+   * to the correct namespace, and adding it to the set of
+   * active journals if necessary.
    * 
-   * @param registration
-   * @throws IOException
+   * @param bnReg registration of the new BackupNode
+   * @param nnReg registration of this NameNode
+   * @throws IOException if the namespace IDs do not match
    */
-  void registerBackupNode(NamenodeRegistration registration)
-    throws IOException {
+  void registerBackupNode(NamenodeRegistration bnReg,
+      NamenodeRegistration nnReg) throws IOException {
     writeLock();
     try {
       if(getFSImage().getStorage().getNamespaceID() 
-         != registration.getNamespaceID())
+         != bnReg.getNamespaceID())
         throw new IOException("Incompatible namespaceIDs: "
             + " Namenode namespaceID = "
             + getFSImage().getStorage().getNamespaceID() + "; "
-            + registration.getRole() +
-            " node namespaceID = " + registration.getNamespaceID());
-      boolean regAllowed = getEditLog().checkBackupRegistration(registration);
-      if(!regAllowed)
-        throw new IOException("Registration is not allowed. " +
-                              "Another node is registered as a backup.");
+            + bnReg.getRole() +
+            " node namespaceID = " + bnReg.getNamespaceID());
+      if (bnReg.getRole() == NamenodeRole.BACKUP) {
+        getFSImage().getEditLog().registerBackupNode(
+            bnReg, nnReg);
+      }
     } finally {
       writeUnlock();
     }
@@ -5943,4 +5942,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
       }
     }
   }
+
+
 }

+ 7 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -104,4 +104,11 @@ public class FileJournalManager implements JournalManager {
     }
   }
 
+  @Override
+  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+      throws IOException {
+    File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
+    return new EditLogFileInputStream(f);
+  }
+
 }

+ 8 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java

@@ -52,4 +52,12 @@ public interface JournalManager {
    */
   void archiveLogsOlderThan(long minTxIdToKeep, StorageArchiver archiver)
     throws IOException;
+
+  /**
+   * @return an EditLogInputStream that reads from the same log that
+   * the edit log is currently writing. May return null if this journal
+   * manager does not support this operation.
+   */  
+  EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
+    throws IOException;
 }

+ 3 - 12
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -744,8 +744,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
   public NamenodeRegistration register(NamenodeRegistration registration)
   throws IOException {
     verifyVersion(registration.getVersion());
-    namesystem.registerBackupNode(registration);
-    return setRegistration();
+    NamenodeRegistration myRegistration = setRegistration();
+    namesystem.registerBackupNode(registration, myRegistration);
+    return myRegistration;
   }
 
   @Override // NamenodeProtocol
@@ -766,15 +767,6 @@ public class NameNode implements NamenodeProtocols, FSConstants {
     namesystem.endCheckpoint(registration, sig);
   }
 
-  @Override // NamenodeProtocol
-  public void journal(NamenodeRegistration registration,
-                      int jAction,
-                      int length,
-                      byte[] args) throws IOException {
-    // Active name-node cannot journal.
-    throw new UnsupportedActionException("journal");
-  }
-
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
@@ -1151,7 +1143,6 @@ public class NameNode implements NamenodeProtocols, FSConstants {
     return namesystem.getTransactionID();
   }
 
-  @Deprecated
   @Override // NamenodeProtocol
   public CheckpointSignature rollEditLog() throws IOException {
     return namesystem.rollEditLog();

+ 44 - 38
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -28,8 +28,6 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 
-import javax.jws.soap.SOAPBinding.Use;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.NNStorageArchivalManager.StorageArchiver;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -390,6 +387,20 @@ public class SecondaryNameNode implements Runnable {
       final CheckpointSignature sig,
       final RemoteEditLogManifest manifest
   ) throws IOException {
+    
+    // Sanity check manifest - these could happen if, eg, someone on the
+    // NN side accidentally rmed the storage directories
+    if (manifest.getLogs().isEmpty()) {
+      throw new IOException("Found no edit logs to download on NN since txid " 
+          + sig.mostRecentCheckpointTxId);
+    }
+    
+    long expectedTxId = sig.mostRecentCheckpointTxId + 1;
+    if (manifest.getLogs().get(0).getStartTxId() != expectedTxId) {
+      throw new IOException("Bad edit log manifest (expected txid = " +
+          expectedTxId + ": " + manifest);
+    }
+
     try {
         Boolean b = UserGroupInformation.getCurrentUser().doAs(
             new PrivilegedExceptionAction<Boolean>() {
@@ -469,6 +480,7 @@ public class SecondaryNameNode implements Runnable {
    */
   boolean doCheckpoint() throws IOException {
     checkpointImage.ensureCurrentDirExists();
+    NNStorage dstStorage = checkpointImage.getStorage();
     
     // Tell the namenode to start logging transactions in a new edit file
     // Returns a token that would be used to upload the merged image.
@@ -482,7 +494,9 @@ public class SecondaryNameNode implements Runnable {
     } else {
       // if we're a fresh 2NN, just take the storage info from the server
       // we first talk to.
-      checkpointImage.getStorage().setStorageInfo(sig);
+      dstStorage.setStorageInfo(sig);
+      dstStorage.setClusterID(sig.getClusterID());
+      dstStorage.setBlockPoolID(sig.getBlockpoolID());
     }
 
     // error simulation code for junit test
@@ -494,28 +508,17 @@ public class SecondaryNameNode implements Runnable {
     RemoteEditLogManifest manifest =
       namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
 
-    // Sanity check manifest - these could happen if, eg, someone on the
-    // NN side accidentally rmed the storage directories
-    if (manifest.getLogs().isEmpty()) {
-      throw new IOException("Found no edit logs to download on NN since txid " 
-          + sig.mostRecentCheckpointTxId);
-    }
-    if (manifest.getLogs().get(0).getStartTxId() != sig.mostRecentCheckpointTxId + 1) {
-      throw new IOException("Bad edit log manifest (expected txid = " +
-          (sig.mostRecentCheckpointTxId + 1) + ": " + manifest);
-    }
-    
     boolean loadImage = downloadCheckpointFiles(
         fsName, checkpointImage, sig, manifest);   // Fetch fsimage and edits
-    doMerge(conf, sig, manifest, loadImage, checkpointImage);
+    doMerge(sig, manifest, loadImage, checkpointImage);
     
     //
     // Upload the new image into the NameNode. Then tell the Namenode
     // to make this new uploaded image as the most current image.
     //
-    long txid = checkpointImage.getStorage().getMostRecentCheckpointTxId();
+    long txid = checkpointImage.getLastAppliedTxId();
     TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
-        checkpointImage.getStorage(), txid);
+        dstStorage, txid);
 
     // error simulation code for junit test
     if (ErrorSimulator.getErrorSimulation(1)) {
@@ -524,7 +527,7 @@ public class SecondaryNameNode implements Runnable {
     }
 
     LOG.warn("Checkpoint done. New Image Size: " 
-             + checkpointImage.getStorage().getFsImageName(txid).length());
+             + dstStorage.getFsImageName(txid).length());
     
     // Since we've successfully checkpointed, we can remove some old
     // image files
@@ -678,6 +681,12 @@ public class SecondaryNameNode implements Runnable {
                       Collection<URI> imageDirs,
                       Collection<URI> editsDirs) throws IOException {
       super(conf, (FSNamesystem)null, imageDirs, editsDirs);
+      setFSNamesystem(new FSNamesystem(this, conf));
+      
+      // the 2NN never writes edits -- it only downloads them. So
+      // we shouldn't have any editLog instance. Setting to null
+      // makes sure we don't accidentally depend on it.
+      editLog = null;
     }
 
     /**
@@ -749,45 +758,42 @@ public class SecondaryNameNode implements Runnable {
     }
   }
     
-  static void doMerge(Configuration conf, 
+  static void doMerge(
       CheckpointSignature sig, RemoteEditLogManifest manifest,
       boolean loadImage, FSImage dstImage) throws IOException {   
     NNStorage dstStorage = dstImage.getStorage();
     
     dstStorage.setStorageInfo(sig);
     if (loadImage) {
-      // TODO: dstImage.namesystem.close(); ??
-      dstImage.namesystem = new FSNamesystem(dstImage, conf);
-      dstImage.editLog = new FSEditLog(dstStorage);
-
       File file = dstStorage.findImageFile(sig.mostRecentCheckpointTxId);
       if (file == null) {
         throw new IOException("Couldn't find image file at txid " + 
             sig.mostRecentCheckpointTxId + " even though it should have " +
             "just been downloaded");
       }
-      LOG.debug("2NN loading image from " + file);
-      dstImage.loadFSImage(file);
+      dstImage.reloadFromImageFile(file);
     }
+    
+    rollForwardByApplyingLogs(manifest, dstImage);
+    dstImage.saveFSImageInAllDirs(dstImage.getLastAppliedTxId());
+    dstStorage.writeAll();
+  }
+  
+  static void rollForwardByApplyingLogs(
+      RemoteEditLogManifest manifest,
+      FSImage dstImage) throws IOException {
+    NNStorage dstStorage = dstImage.getStorage();
+
     List<File> editsFiles = Lists.newArrayList();
     for (RemoteEditLog log : manifest.getLogs()) {
       File f = dstStorage.findFinalizedEditsFile(
           log.getStartTxId(), log.getEndTxId());
-      editsFiles.add(f);
+      if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
+        editsFiles.add(f);
+      }
     }
     LOG.info("SecondaryNameNode about to load edits from " +
         editsFiles.size() + " file(s).");
     dstImage.loadEdits(editsFiles);
-    
-    // TODO: why do we need the following two lines? We shouldn't have even
-    // been able to download an image from a NN that had a different
-    // cluster ID or blockpool ID! this should only be done for the
-    // very first checkpoint.
-    dstStorage.setClusterID(sig.getClusterID());
-    dstStorage.setBlockPoolID(sig.getBlockpoolID());
-    
-    sig.validateStorageInfo(dstImage);
-    dstImage.saveFSImageInAllDirs(dstImage.getEditLog().getLastWrittenTxId());
-    dstStorage.writeAll();
   }
 }

+ 10 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

@@ -84,6 +84,16 @@ class TransferFsImage implements FSConstants {
 
     List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.EDITS, fileName);
     assert !dstFiles.isEmpty() : "No checkpoint targets.";
+    
+    for (File f : dstFiles) {
+      if (f.exists() && f.canRead()) {
+        LOG.info("Skipping download of remote edit log " +
+            log + " since it already is stored locally at " + f);
+        return;
+      } else {
+        LOG.debug("Dest file: " + f);
+      }
+    }
 
     getFileClient(fsName, fileid, dstFiles, dstStorage, false);
     LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +

+ 58 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BackupNodeProtocol.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface BackupNodeProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
+
+  /**
+   * Journal edit records.
+   * This message is sent by the active name-node to the backup node
+   * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
+   * changes with the backup namespace image.
+   * 
+   * @param registration active node registration
+   * @param firstTxnId the first transaction of this batch
+   * @param numTxns number of transactions
+   * @param records byte array containing serialized journal records
+   */
+  public void journal(NamenodeRegistration registration,
+                      long firstTxnId,
+                      int numTxns,
+                      byte[] records) throws IOException;
+
+  /**
+   * Notify the BackupNode that the NameNode has rolled its edit logs
+   * and is now writing a new log segment.
+   * @param registration the registration of the active NameNode
+   * @param txid the first txid in the new log
+   */
+  public void startLogSegment(NamenodeRegistration registration,
+      long txid) throws IOException;
+}

+ 1 - 22
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -50,11 +51,6 @@ public interface NamenodeProtocol extends VersionedProtocol {
   final static int NOTIFY = 0;
   final static int FATAL = 1;
 
-  // Journal action codes. See journal().
-  public static byte JA_IS_ALIVE = 100; // check whether the journal is alive
-  public static byte JA_JOURNAL      = 101; // just journal
-  public static byte JA_JSPOOL_START = 102;  // = FSEditLogOpCodes.OP_JSPOOL_START
-
   public final static int ACT_UNKNOWN = 0;    // unknown action   
   public final static int ACT_SHUTDOWN = 50;   // shutdown node
   public final static int ACT_CHECKPOINT = 51;   // do checkpoint
@@ -167,22 +163,5 @@ public interface NamenodeProtocol extends VersionedProtocol {
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
     throws IOException;
-
-  /**
-   * Journal edit records.
-   * This message is sent by the active name-node to the backup node
-   * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
-   * changes with the backup namespace image.
-   * 
-   * @param registration active node registration
-   * @param jAction journal action
-   * @param length length of the byte array
-   * @param records byte array containing serialized journal records
-   * @throws IOException
-   */
-  public void journal(NamenodeRegistration registration,
-                      int jAction,
-                      int length,
-                      byte[] records) throws IOException;
 }
 

+ 25 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java

@@ -27,12 +27,15 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.collect.ImmutableMap;
+
 /**
  * NamespaceInfo is returned by the name-node in reply 
  * to a data-node handshake.
@@ -98,4 +101,26 @@ public class NamespaceInfo extends StorageInfo {
   public String toString(){
     return super.toString() + ";bpid=" + blockPoolID;
   }
+
+  public void validateStorage(NNStorage storage) throws IOException {
+    if (layoutVersion != storage.getLayoutVersion() ||
+        namespaceID != storage.getNamespaceID() ||
+        cTime != storage.cTime ||
+        !clusterID.equals(storage.getClusterID()) ||
+        !blockPoolID.equals(storage.getBlockPoolID())) {
+      throw new IOException("Inconsistent namespace information:\n" +
+          "NamespaceInfo has:\n" +
+          "LV=" + layoutVersion + ";" +
+          "NS=" + namespaceID + ";" +
+          "cTime=" + cTime + ";" +
+          "CID=" + clusterID + ";" +
+          "BPID=" + blockPoolID +
+          ".\nStorage has:\n" +
+          "LV=" + storage.getLayoutVersion() + ";" +
+          "NS=" + storage.getNamespaceID() + ";" +
+          "cTime=" + storage.getCTime() + ";" +
+          "CID=" + storage.getClusterID() + ";" +
+          "BPID=" + storage.getBlockPoolID() + ".");
+    }
+  }
 }

+ 41 - 1
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

@@ -25,8 +25,11 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
+import java.net.URI;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,6 +37,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundFSImage;
@@ -44,6 +48,7 @@ import org.apache.hadoop.io.MD5Hash;
 import org.mockito.Mockito;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -254,6 +259,31 @@ public abstract class FSImageTestUtil {
     return (latestImage == null) ? null : latestImage.getFile();
   }
 
+  /**
+   * Assert that the NameNode has checkpoints at the expected
+   * transaction IDs.
+   */
+  static void assertNNHasCheckpoints(MiniDFSCluster cluster,
+      List<Integer> txids) {
+
+    for (File nameDir : getNameNodeCurrentDirs(cluster)) {
+      // Should have fsimage_N for the three checkpoints
+      for (long checkpointTxId : txids) {
+        File image = new File(nameDir,
+                              NNStorage.getImageFileName(checkpointTxId));
+        assertTrue("Expected non-empty " + image, image.length() > 0);
+      }
+    }
+  }
+
+  static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
+    List<File> nameDirs = Lists.newArrayList();
+    for (URI u : cluster.getNameDirs(0)) {
+      nameDirs.add(new File(u.getPath(), "current"));
+    }
+    return nameDirs;
+  }
+
   /**
    * @return the latest edits log, finalized or otherwise, from the given
    * storage directory.
@@ -264,7 +294,17 @@ public abstract class FSImageTestUtil {
       new FSImageTransactionalStorageInspector();
     inspector.inspectDirectory(sd);
     
-    return inspector.foundEditLogs.get(inspector.foundEditLogs.size() - 1);
+    List<FoundEditLog> foundEditLogs = Lists.newArrayList(
+        inspector.getFoundEditLogs());
+    return Collections.max(foundEditLogs, new Comparator<FoundEditLog>() {
+      @Override
+      public int compare(FoundEditLog a, FoundEditLog b) {
+        return ComparisonChain.start()
+          .compare(a.getStartTxId(), b.getStartTxId())
+          .compare(a.getLastTxId(), b.getLastTxId())
+          .result();
+      }
+    });
   }
 
   /**

+ 195 - 108
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java

@@ -19,9 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -29,14 +32,28 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.FoundEditLog;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 
 import junit.framework.TestCase;
 
 public class TestBackupNode extends TestCase {
   public static final Log LOG = LogFactory.getLog(TestBackupNode.class);
 
+  
+  static {
+    ((Log4JLogger)Checkpointer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)BackupImage.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
   static final String BASE_DIR = MiniDFSCluster.getBaseDirectory();
 
   protected void setUp() throws Exception {
@@ -53,73 +70,185 @@ public class TestBackupNode extends TestCase {
     dirB.mkdirs();
   }
 
-  protected void tearDown() throws Exception {
-    super.tearDown();
-    File baseDir = new File(BASE_DIR);
-    if(!(FileUtil.fullyDelete(baseDir)))
-      throw new IOException("Cannot remove directory: " + baseDir);
-  }
-
-  static void writeFile(FileSystem fileSys, Path name, int repl)
-  throws IOException {
-    TestCheckpoint.writeFile(fileSys, name, repl);
-  }
-
-
-  static void checkFile(FileSystem fileSys, Path name, int repl)
-  throws IOException {
-    TestCheckpoint.checkFile(fileSys, name, repl);
-  }
-
-  void cleanupFile(FileSystem fileSys, Path name)
-  throws IOException {
-    TestCheckpoint.cleanupFile(fileSys, name);
-  }
-
-  static String getBackupNodeDir(StartupOption t, int i) {
-    return BASE_DIR + "name" + t.getName() + i + "/";
+  static String getBackupNodeDir(StartupOption t, int idx) {
+    return BASE_DIR + "name" + t.getName() + idx + "/";
   }
 
   BackupNode startBackupNode(Configuration conf,
-                             StartupOption t, int i) throws IOException {
+                             StartupOption startupOpt,
+                             int idx) throws IOException {
     Configuration c = new HdfsConfiguration(conf);
-    String dirs = getBackupNodeDir(t, i);
+    String dirs = getBackupNodeDir(startupOpt, idx);
     c.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dirs);
     c.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         "${" + DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY + "}");
-    return (BackupNode)NameNode.createNameNode(new String[]{t.getName()}, c);
+    c.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY,
+        "127.0.0.1:0");
+
+    return (BackupNode)NameNode.createNameNode(new String[]{startupOpt.getName()}, c);
   }
 
-  void waitCheckpointDone(BackupNode backup) {
-/*    do {
+  void waitCheckpointDone(
+      MiniDFSCluster cluster, BackupNode backup, long txid) {
+    long thisCheckpointTxId;
+    do {
       try {
-        LOG.info("Waiting checkpoint to complete...");
+        LOG.info("Waiting checkpoint to complete... " +
+            "checkpoint txid should increase above " + txid);
         Thread.sleep(1000);
       } catch (Exception e) {}
-    } while(backup.getCheckpointState() != CheckpointStates.START); */
+      thisCheckpointTxId = backup.getFSImage().getStorage()
+        .getMostRecentCheckpointTxId();
+
+    } while (thisCheckpointTxId < txid);
+    
+    // Check that the checkpoint got uploaded to NN successfully
+    FSImageTestUtil.assertNNHasCheckpoints(cluster,
+        Collections.singletonList((int)thisCheckpointTxId));
   }
 
-  public void testCheckpoint() throws IOException {
+  public void testCheckpointNode() throws Exception {
     testCheckpoint(StartupOption.CHECKPOINT);
-    testCheckpoint(StartupOption.BACKUP);
   }
+  
+  /**
+   * Ensure that the backupnode will tail edits from the NN
+   * and keep in sync, even while the NN rolls, checkpoints
+   * occur, etc.
+   */
+  public void testBackupNodeTailsEdits() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    BackupNode backup = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+                                  .numDataNodes(0).build();
+      fileSys = cluster.getFileSystem();
+      backup = startBackupNode(conf, StartupOption.BACKUP, 1);
+      
+      BackupImage bnImage = backup.getBNImage();
+      testBNInSync(cluster, backup, 1);
+      
+      // Force a roll -- BN should roll with NN.
+      NameNode nn = cluster.getNameNode();
+      nn.rollEditLog();
+      assertEquals(bnImage.getEditLog().getCurSegmentTxId(),
+          nn.getFSImage().getEditLog().getCurSegmentTxId());
+      
+      // BN should stay in sync after roll
+      testBNInSync(cluster, backup, 2);
+      
+      long nnImageBefore =
+        nn.getFSImage().getStorage().getMostRecentCheckpointTxId();
+      // BN checkpoint
+      backup.doCheckpoint();
+      
+      // NN should have received a new image
+      long nnImageAfter =
+        nn.getFSImage().getStorage().getMostRecentCheckpointTxId();
+      
+      assertTrue("nn should have received new checkpoint. before: " +
+          nnImageBefore + " after: " + nnImageAfter,
+          nnImageAfter > nnImageBefore);
 
-  void testCheckpoint(StartupOption op) throws IOException {
+      // BN should stay in sync after checkpoint
+      testBNInSync(cluster, backup, 3);
+
+      // Stop BN
+      StorageDirectory sd = bnImage.getStorage().getStorageDir(0);
+      backup.stop();
+      backup = null;
+      
+      // When shutting down the BN, it shouldn't finalize logs that are
+      // still open on the NN
+      FoundEditLog editsLog = FSImageTestUtil.findLatestEditsLog(sd);
+      assertEquals(editsLog.getStartTxId(),
+          nn.getFSImage().getEditLog().getCurSegmentTxId());
+      assertTrue("Should not have finalized " + editsLog,
+          editsLog.isInProgress());
+      
+      // do some edits
+      assertTrue(fileSys.mkdirs(new Path("/edit-while-bn-down")));
+      
+      // start a new backup node
+      backup = startBackupNode(conf, StartupOption.BACKUP, 1);
+
+      testBNInSync(cluster, backup, 4);
+      assertNotNull(backup.getNamesystem().getFileInfo("/edit-while-bn-down", false));
+    } finally {
+      LOG.info("Shutting down...");
+      if (backup != null) backup.stop();
+      if (fileSys != null) fileSys.close();
+      if (cluster != null) cluster.shutdown();
+    }
+    
+    assertStorageDirsMatch(cluster.getNameNode(), backup);
+  }
+
+  private void testBNInSync(MiniDFSCluster cluster, final BackupNode backup,
+      int testIdx) throws Exception {
+    
+    final NameNode nn = cluster.getNameNode();
+    final FileSystem fs = cluster.getFileSystem();
+
+    // Do a bunch of namespace operations, make sure they're replicated
+    // to the BN.
+    for (int i = 0; i < 10; i++) {
+      final String src = "/test_" + testIdx + "_" + i;
+      LOG.info("Creating " + src + " on NN");
+      Path p = new Path(src);
+      assertTrue(fs.mkdirs(p));
+      
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          LOG.info("Checking for " + src + " on BN");
+          try {
+            boolean hasFile = backup.getNamesystem().getFileInfo(src, false) != null;
+            boolean txnIdMatch = backup.getTransactionID() == nn.getTransactionID();
+            return hasFile && txnIdMatch;
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }, 30, 10000);
+    }
+    
+    assertStorageDirsMatch(nn, backup);
+  }
+
+  private void assertStorageDirsMatch(final NameNode nn, final BackupNode backup)
+      throws Exception {
+    // Check that the stored files in the name dirs are identical
+    List<File> dirs = Lists.newArrayList(
+        FSImageTestUtil.getCurrentDirs(nn.getFSImage().getStorage(),
+            null));
+    dirs.addAll(FSImageTestUtil.getCurrentDirs(backup.getFSImage().getStorage(),
+        null));
+    FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of("VERSION"));
+  }
+  
+  public void testBackupNode() throws Exception {
+    testCheckpoint(StartupOption.BACKUP);
+  }  
+
+  void testCheckpoint(StartupOption op) throws Exception {
     Path file1 = new Path("checkpoint.dat");
     Path file2 = new Path("checkpoint2.dat");
 
     Configuration conf = new HdfsConfiguration();
-    short replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
     conf.set(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "0");
     conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // disable block scanner
-    int numDatanodes = Math.max(3, replication);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1);
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     BackupNode backup = null;
 
     try {
       cluster = new MiniDFSCluster.Builder(conf)
-                                  .numDataNodes(numDatanodes).build();
+                                  .numDataNodes(0).build();
       fileSys = cluster.getFileSystem();
       //
       // verify that 'format' really blew away all pre-existing files
@@ -130,14 +259,14 @@ public class TestBackupNode extends TestCase {
       //
       // Create file1
       //
-      writeFile(fileSys, file1, replication);
-      checkFile(fileSys, file1, replication);
+      assertTrue(fileSys.mkdirs(file1));
 
       //
       // Take a checkpoint
       //
+      long txid = cluster.getNameNode().getTransactionID();
       backup = startBackupNode(conf, op, 1);
-      waitCheckpointDone(backup);
+      waitCheckpointDone(cluster, backup, txid);
     } catch(IOException e) {
       LOG.error("Error in TestBackupNode:", e);
       assertTrue(e.getLocalizedMessage(), false);
@@ -146,39 +275,45 @@ public class TestBackupNode extends TestCase {
       if(fileSys != null) fileSys.close();
       if(cluster != null) cluster.shutdown();
     }
-    File imageFileNN = new File(BASE_DIR, "name1/current/fsimage");
-    File imageFileBN = new File(getBackupNodeDir(op, 1), "/current/fsimage");
-    LOG.info("NameNode fsimage length = " + imageFileNN.length());
-    LOG.info("Backup Node fsimage length = " + imageFileBN.length());
-    assertTrue(imageFileNN.length() == imageFileBN.length());
+    File nnCurDir = new File(BASE_DIR, "name1/current/");
+    File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/");
 
+    FSImageTestUtil.assertParallelFilesAreIdentical(
+        ImmutableList.of(bnCurDir, nnCurDir),
+        ImmutableSet.<String>of("VERSION"));
+    
     try {
       //
       // Restart cluster and verify that file1 still exist.
       //
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false).build();
       fileSys = cluster.getFileSystem();
       // check that file1 still exists
-      checkFile(fileSys, file1, replication);
-      cleanupFile(fileSys, file1);
+      assertTrue(fileSys.exists(file1));
+      fileSys.delete(file1, true);
 
       // create new file file2
-      writeFile(fileSys, file2, replication);
-      checkFile(fileSys, file2, replication);
+      fileSys.mkdirs(file2);
 
       //
       // Take a checkpoint
       //
       backup = startBackupNode(conf, op, 1);
-      waitCheckpointDone(backup);
+      long txid = cluster.getNameNode().getTransactionID();
+      waitCheckpointDone(cluster, backup, txid);
 
       for (int i = 0; i < 10; i++) {
-        writeFile(fileSys, new Path("file_" + i), replication);
+        fileSys.mkdirs(new Path("file_" + i));
       }
 
+      txid = cluster.getNameNode().getTransactionID();
       backup.doCheckpoint();
-      waitCheckpointDone(backup);
+      waitCheckpointDone(cluster, backup, txid);
+
+      txid = cluster.getNameNode().getTransactionID();
+      backup.doCheckpoint();
+      waitCheckpointDone(cluster, backup, txid);
 
     } catch(IOException e) {
       LOG.error("Error in TestBackupNode:", e);
@@ -188,22 +323,22 @@ public class TestBackupNode extends TestCase {
       if(fileSys != null) fileSys.close();
       if(cluster != null) cluster.shutdown();
     }
-    LOG.info("NameNode fsimage length = " + imageFileNN.length());
-    LOG.info("Backup Node fsimage length = " + imageFileBN.length());
-    assertTrue(imageFileNN.length() == imageFileBN.length());
+    FSImageTestUtil.assertParallelFilesAreIdentical(
+        ImmutableList.of(bnCurDir, nnCurDir),
+        ImmutableSet.<String>of("VERSION"));
 
     try {
       //
       // Restart cluster and verify that file2 exists and
       // file1 does not exist.
       //
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
       fileSys = cluster.getFileSystem();
 
       assertTrue(!fileSys.exists(file1));
 
       // verify that file2 exists
-      checkFile(fileSys, file2, replication);
+      assertTrue(fileSys.exists(file2));
     } catch(IOException e) {
       LOG.error("Error in TestBackupNode:", e);
       assertTrue(e.getLocalizedMessage(), false);
@@ -212,52 +347,4 @@ public class TestBackupNode extends TestCase {
       cluster.shutdown();
     }
   }
-
-  /**
-   * Test that only one backup node can register.
-   * @throws IOException
-   */
-  public void testBackupRegistration() throws IOException {
-    Configuration conf1 = new HdfsConfiguration();
-    Configuration conf2 = null;
-    MiniDFSCluster cluster = null;
-    BackupNode backup1 = null;
-    BackupNode backup2 = null;
-    try {
-      // start name-node and backup node 1
-      cluster = new MiniDFSCluster.Builder(conf1).numDataNodes(0).build();
-      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
-      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7775");
-      backup1 = startBackupNode(conf1, StartupOption.BACKUP, 1);
-      // try to start backup node 2
-      conf2 = new HdfsConfiguration(conf1);
-      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7772");
-      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7776");
-      try {
-        backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
-        backup2.stop();
-        backup2 = null;
-        assertTrue("Only one backup node should be able to start", false);
-      } catch(IOException e) {
-        assertTrue(
-            e.getLocalizedMessage().contains("Registration is not allowed"));
-        // should fail - doing good
-      }
-      // stop backup node 1; backup node 2 should be able to start
-      backup1.stop();
-      backup1 = null;
-      try {
-        backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
-      } catch(IOException e) {
-        assertTrue("Backup node 2 should be able to start", false);
-      }
-    } catch(IOException e) {
-      LOG.error("Error in TestBackupNode:", e);
-      assertTrue(e.getLocalizedMessage(), false);
-    } finally {
-      if(backup1 != null) backup1.stop();
-      if(backup2 != null) backup2.stop();
-      if(cluster != null) cluster.shutdown();
-    }
-  }
 }

+ 3 - 27
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -46,8 +46,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -56,7 +54,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.ipc.WritableRpcEngine;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.util.StringUtils;
@@ -71,6 +68,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
+import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
+
 /**
  * This class tests the creation and validation of a checkpoint.
  */
@@ -1699,22 +1699,6 @@ public class TestCheckpoint extends TestCase {
     }
   }
 
-  /**
-   * Assert that the NameNode has checkpoints at the expected
-   * transaction IDs.
-   */
-  private void assertNNHasCheckpoints(MiniDFSCluster cluster,
-      List<Integer> txids) {
-
-    for (File nameDir : getNameNodeCurrentDirs(cluster)) {
-      // Should have fsimage_N for the three checkpoints
-      for (long checkpointTxId : txids) {
-        File image = new File(nameDir,
-                              NNStorage.getImageFileName(checkpointTxId));
-        assertTrue("Expected non-empty " + image, image.length() > 0);
-      }
-    }
-  }
 
   /**
    * Assert that if any two files have the same name across the 2NNs
@@ -1732,14 +1716,6 @@ public class TestCheckpoint extends TestCase {
         ImmutableSet.of("VERSION"));    
   }
   
-  private List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
-    List<File> nameDirs = Lists.newArrayList();
-    for (URI u : cluster.getNameDirs(0)) {
-      nameDirs.add(new File(u.getPath(), "current"));
-    }
-    return nameDirs;
-  }
-
   @SuppressWarnings("deprecation")
   private List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) {
     List<File> ret = Lists.newArrayList();