Ver código fonte

HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.
HDFS-10519. Add a configuration option to enable in-progress edit log tailing. Contributed by Jiayi Zhou.

Erik Krogen 7 anos atrás
pai
commit
85f81fe458
32 arquivos alterados com 964 adições e 70 exclusões
  1. 13 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
  2. 8 2
      hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
  3. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  4. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
  5. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
  6. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
  7. 129 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
  8. 12 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
  9. 12 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
  11. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
  12. 19 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  13. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
  14. 8 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
  15. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  18. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
  19. 12 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
  20. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
  21. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
  23. 155 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
  24. 98 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
  25. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
  26. 9 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  27. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
  28. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
  29. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
  30. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
  31. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java
  32. 339 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java

@@ -297,7 +297,19 @@ public class IOUtils {
       cleanupWithLogger(null, stream);
     }
   }
-  
+
+  /**
+   * Closes the streams ignoring {@link Throwable}.
+   * Must only be called in cleaning up from exception handlers.
+   *
+   * @param streams the Streams to close
+   */
+  public static void closeStreams(java.io.Closeable... streams) {
+    if (streams != null) {
+      cleanupWithLogger(null, streams);
+    }
+  }
+
   /**
    * Closes the socket ignoring {@link IOException}
    *

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java

@@ -533,12 +533,18 @@ public class BookKeeperJournalManager implements JournalManager {
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
       throw new IOException("Error finalising ledger", ie);
-    } 
+    }
+  }
+
+  public void selectInputStreams(
+      Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) throws IOException {
+    selectInputStreams(streams, fromTxnId, inProgressOk, false);
   }
 
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk)
+      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
       throws IOException {
     List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
         inProgressOk);

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -277,7 +277,8 @@ public class PBHelper {
   public static RemoteEditLogManifestProto convert(
       RemoteEditLogManifest manifest) {
     RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto
-        .newBuilder();
+        .newBuilder()
+        .setCommittedTxnId(manifest.getCommittedTxnId());
     for (RemoteEditLog log : manifest.getLogs()) {
       builder.addLogs(convert(log));
     }
@@ -291,7 +292,8 @@ public class PBHelper {
     for (RemoteEditLogProto l : manifest.getLogsList()) {
       logs.add(convert(l));
     }
-    return new RemoteEditLogManifest(logs);
+    return new RemoteEditLogManifest(logs,
+            manifest.getCommittedTxnId());
   }
 
   public static CheckpointCommandProto convert(CheckpointCommand cmd) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java

@@ -22,6 +22,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -106,6 +107,12 @@ interface AsyncLogger {
    * Begin a new epoch on the target node.
    */
   public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
+
+  /**
+   * Fetch journaled edits from the cache.
+   */
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions);
   
   /**
    * Fetch the list of edit logs available on the remote node.

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java

@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -261,6 +262,19 @@ class AsyncLoggerSet {
     return QuorumCall.create(calls);
   }
 
+  public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto>
+  getJournaledEdits(long fromTxnId, int maxTransactions) {
+    Map<AsyncLogger,
+        ListenableFuture<GetJournaledEditsResponseProto>> calls
+        = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<GetJournaledEditsResponseProto> future =
+          logger.getJournaledEdits(fromTxnId, maxTransactions);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
   public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
       long fromTxnId, boolean inProgressOk) {
     Map<AsyncLogger,

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -547,6 +548,19 @@ public class IPCLoggerChannel implements AsyncLogger {
     });
   }
 
+  @Override
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions) {
+    return parallelExecutor.submit(
+        new Callable<GetJournaledEditsResponseProto>() {
+          @Override
+          public GetJournaledEditsResponseProto call() throws IOException {
+            return getProxy().getJournaledEdits(journalId, nameServiceId,
+                fromTxnId, maxTransactions);
+          }
+        });
+  }
+
   @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId, final boolean inProgressOk) {

+ 129 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -69,6 +71,14 @@ import com.google.protobuf.TextFormat;
 public class QuorumJournalManager implements JournalManager {
   static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
 
+  // This config is not publicly exposed
+  static final String QJM_RPC_MAX_TXNS_KEY =
+      "dfs.ha.tail-edits.qjm.rpc.max-txns";
+  static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
+
+  // Maximum number of transactions to fetch at a time when using the
+  // RPC edit fetch mechanism
+  private final int maxTxnsPerRpc;
   // Timeouts for which the QJM will wait for each of the following actions.
   private final int startSegmentTimeoutMs;
   private final int prepareRecoveryTimeoutMs;
@@ -111,6 +121,10 @@ public class QuorumJournalManager implements JournalManager {
     this.connectionFactory = URLConnectionFactory
         .newDefaultURLConnectionFactory(conf);
 
+    this.maxTxnsPerRpc =
+        conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
+    Preconditions.checkArgument(maxTxnsPerRpc > 0,
+        "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@@ -404,8 +418,11 @@ public class QuorumJournalManager implements JournalManager {
         layoutVersion);
     loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
         "startLogSegment(" + txId + ")");
-    return new QuorumOutputStream(loggers, txId,
-        outputBufferCapacity, writeTxnsTimeoutMs);
+    boolean updateCommittedTxId = conf.getBoolean(
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
+    return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
+        writeTxnsTimeoutMs, updateCommittedTxId);
   }
 
   @Override
@@ -464,32 +481,134 @@ public class QuorumJournalManager implements JournalManager {
     loggers.close();
   }
 
-  @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxnId, boolean inProgressOk) throws IOException {
+    selectInputStreams(streams, fromTxnId, inProgressOk, false);
+  }
+
+  @Override
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk,
+      boolean onlyDurableTxns) throws IOException {
+    if (inProgressOk) {
+      LOG.info("Tailing edits starting from txn ID " + fromTxnId +
+          " via RPC mechanism");
+      try {
+        Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
+        selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
+        streams.addAll(rpcStreams);
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
+            " via RPC; falling back to streaming.", ioe);
+      }
+    }
+    selectStreamingInputStreams(streams, fromTxnId, inProgressOk,
+        onlyDurableTxns);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the RPC
+   * mechanism optimized for low latency.
+   *
+   * @param streams The collection to store the return streams into.
+   * @param fromTxnId Select edits starting from this transaction ID
+   * @param onlyDurableTxns Iff true, only include transactions which have been
+   *                        committed to a quorum of the journals.
+   * @throws IOException Upon issues, including cache misses on the journals.
+   */
+  private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean onlyDurableTxns) throws IOException {
+    QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> q =
+        loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc);
+    Map<AsyncLogger, GetJournaledEditsResponseProto> responseMap =
+        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+            "selectRpcInputStreams");
+    assert responseMap.size() >= loggers.getMajoritySize() :
+        "Quorum call returned without a majority";
+
+    List<Integer> responseCounts = new ArrayList<>();
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      responseCounts.add(resp.getTxnCount());
+    }
+    Collections.sort(responseCounts);
+    int highestTxnCount = responseCounts.get(responseCounts.size() - 1);
+    if (LOG.isDebugEnabled() || highestTxnCount < 0) {
+      StringBuilder msg = new StringBuilder("Requested edits starting from ");
+      msg.append(fromTxnId).append("; got ").append(responseMap.size())
+          .append(" responses: <");
+      for (Map.Entry<AsyncLogger, GetJournaledEditsResponseProto> ent :
+          responseMap.entrySet()) {
+        msg.append("[").append(ent.getKey()).append(", ")
+            .append(ent.getValue().getTxnCount()).append("],");
+      }
+      msg.append(">");
+      if (highestTxnCount < 0) {
+        throw new IOException("Did not get any valid JournaledEdits " +
+            "responses: " + msg);
+      } else {
+        LOG.debug(msg.toString());
+      }
+    }
 
+    int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
+        responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
+    if (maxAllowedTxns == 0) {
+      LOG.debug("No new edits available in logs; requested starting from " +
+          "ID " + fromTxnId);
+      return;
+    }
+    LOG.info("Selected loggers with >= " + maxAllowedTxns +
+        " transactions starting from " + fromTxnId);
+    PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
+        JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      long endTxnId = fromTxnId - 1 +
+          Math.min(maxAllowedTxns, resp.getTxnCount());
+      allStreams.add(EditLogFileInputStream.fromByteString(
+          resp.getEditLog(), fromTxnId, endTxnId, true));
+    }
+    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the streaming
+   * mechanism optimized for resiliency / bulk load.
+   */
+  private void selectStreamingInputStreams(
+      Collection<EditLogInputStream> streams, long fromTxnId,
+      boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
         loggers.getEditLogManifest(fromTxnId, inProgressOk);
     Map<AsyncLogger, RemoteEditLogManifest> resps =
         loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
-            "selectInputStreams");
-    
-    LOG.debug("selectInputStream manifests:\n" +
+            "selectStreamingInputStreams");
+
+    LOG.debug("selectStreamingInputStream manifests:\n" +
         Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
-    
-    final PriorityQueue<EditLogInputStream> allStreams = 
+
+    final PriorityQueue<EditLogInputStream> allStreams =
         new PriorityQueue<EditLogInputStream>(64,
             JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
       AsyncLogger logger = e.getKey();
       RemoteEditLogManifest manifest = e.getValue();
-      
+      long committedTxnId = manifest.getCommittedTxnId();
+
       for (RemoteEditLog remoteLog : manifest.getLogs()) {
         URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
 
+        long endTxId = remoteLog.getEndTxId();
+
+        // If it's bounded by durable Txns, endTxId could not be larger
+        // than committedTxnId. This ensures the consistency.
+        if (onlyDurableTxns && inProgressOk) {
+          endTxId = Math.min(endTxId, committedTxnId);
+        }
+
         EditLogInputStream elis = EditLogFileInputStream.fromUrl(
             connectionFactory, url, remoteLog.getStartTxId(),
-            remoteLog.getEndTxId(), remoteLog.isInProgress());
+            endTxId, remoteLog.isInProgress());
         allStreams.add(elis);
       }
     }

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java

@@ -33,15 +33,17 @@ class QuorumOutputStream extends EditLogOutputStream {
   private EditsDoubleBuffer buf;
   private final long segmentTxId;
   private final int writeTimeoutMs;
+  private final boolean updateCommittedTxId;
 
   public QuorumOutputStream(AsyncLoggerSet loggers,
       long txId, int outputBufferCapacity,
-      int writeTimeoutMs) throws IOException {
+      int writeTimeoutMs, boolean updateCommittedTxId) throws IOException {
     super();
     this.buf = new EditsDoubleBuffer(outputBufferCapacity);
     this.loggers = loggers;
     this.segmentTxId = txId;
     this.writeTimeoutMs = writeTimeoutMs;
+    this.updateCommittedTxId = updateCommittedTxId;
   }
 
   @Override
@@ -110,6 +112,15 @@ class QuorumOutputStream extends EditLogOutputStream {
       // RPCs will thus let the loggers know of the most recent transaction, even
       // if a logger has fallen behind.
       loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
+
+      // If we don't have this dummy send, committed TxId might be one-batch
+      // stale on the Journal Nodes
+      if (updateCommittedTxId) {
+        QuorumCall<AsyncLogger, Void> fakeCall = loggers.sendEdits(
+            segmentTxId, firstTxToFlush,
+            0, new byte[0]);
+        loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits");
+      }
     }
   }
 

+ 12 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -268,8 +268,8 @@ public class Journal implements Closeable {
     checkFormatted();
     return lastWriterEpoch.get();
   }
-  
-  synchronized long getCommittedTxnIdForTests() throws IOException {
+
+  synchronized long getCommittedTxnId() throws IOException {
     return committedTxnId.get();
   }
 
@@ -375,6 +375,12 @@ public class Journal implements Closeable {
     checkFormatted();
     checkWriteRequest(reqInfo);
 
+    // If numTxns is 0, it's actually a fake send which aims at updating
+    // committedTxId only. So we can return early.
+    if (numTxns == 0) {
+      return;
+    }
+
     checkSync(curSegment != null,
         "Can't write, no segment open" + " ; journal id: " + journalId);
     if (curSegmentTxId != segmentTxId) {
@@ -704,12 +710,12 @@ public class Journal implements Closeable {
         }
       }
       if (log != null && log.isInProgress()) {
-        logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(),
-            true));
+        logs.add(new RemoteEditLog(log.getStartTxId(),
+            getHighestWrittenTxId(), true));
       }
     }
-    
-    return new RemoteEditLogManifest(logs);
+
+    return new RemoteEditLogManifest(logs, getCommittedTxnId());
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java

@@ -80,7 +80,7 @@ class BackupJournalManager implements JournalManager {
 
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk) {
+      long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) {
     // This JournalManager is never used for input. Therefore it cannot
     // return any transactions
   }

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.protobuf.ByteString;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
@@ -120,6 +121,23 @@ public class EditLogFileInputStream extends EditLogInputStream {
     return new EditLogFileInputStream(new URLLog(connectionFactory, url),
         startTxId, endTxId, inProgress);
   }
+
+  /**
+   * Create an EditLogInputStream from a {@link ByteString}, i.e. an in-memory
+   * collection of bytes.
+   *
+   * @param bytes The byte string to read from
+   * @param startTxId the expected starting transaction ID
+   * @param endTxId the expected ending transaction ID
+   * @param inProgress whether the log is in-progress
+   * @return An edit stream to read from
+   */
+  public static EditLogInputStream fromByteString(ByteString bytes,
+      long startTxId, long endTxId, boolean inProgress) {
+    return new EditLogFileInputStream(new ByteStringLog(bytes,
+        String.format("ByteStringEditLog[%d, %d]", startTxId, endTxId)),
+        startTxId, endTxId, inProgress);
+  }
   
   private EditLogFileInputStream(LogSource log,
       long firstTxId, long lastTxId,
@@ -377,6 +395,32 @@ public class EditLogFileInputStream extends EditLogInputStream {
     public long length();
     public String getName();
   }
+
+  private static class ByteStringLog implements LogSource {
+    private final ByteString bytes;
+    private final String name;
+
+    public ByteStringLog(ByteString bytes, String name) {
+      this.bytes = bytes;
+      this.name = name;
+    }
+
+    @Override
+    public InputStream getInputStream() {
+      return bytes.newInput();
+    }
+
+    @Override
+    public long length() {
+      return bytes.size();
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+  }
   
   private static class FileLog implements LogSource {
     private final File file;

+ 19 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -320,7 +320,7 @@ public class FSEditLog implements LogsPurgeable {
     // Safety check: we should never start a segment if there are
     // newer txids readable.
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    journalSet.selectInputStreams(streams, segmentTxId, true);
+    journalSet.selectInputStreams(streams, segmentTxId, true, false);
     if (!streams.isEmpty()) {
       String error = String.format("Cannot start writing at txid %s " +
         "when there is a stream available for read: %s",
@@ -1586,15 +1586,23 @@ public class FSEditLog implements LogsPurgeable {
 
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk) throws IOException {
-    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
+      throws IOException {
+    journalSet.selectInputStreams(streams, fromTxId,
+            inProgressOk, onlyDurableTxns);
   }
 
   public Collection<EditLogInputStream> selectInputStreams(
       long fromTxId, long toAtLeastTxId) throws IOException {
-    return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
+    return selectInputStreams(fromTxId, toAtLeastTxId, null, true, false);
+  }
+
+  public Collection<EditLogInputStream> selectInputStreams(
+      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
+      boolean inProgressOK) throws IOException {
+    return selectInputStreams(fromTxId, toAtLeastTxId,
+        recovery, inProgressOK, false);
   }
-  
   /**
    * Select a list of input streams.
    * 
@@ -1602,16 +1610,18 @@ public class FSEditLog implements LogsPurgeable {
    * @param toAtLeastTxId the selected streams must contain this transaction
    * @param recovery recovery context
    * @param inProgressOk set to true if in-progress streams are OK
+   * @param onlyDurableTxns set to true if streams are bounded
+   *                        by the durable TxId
    */
-  public Collection<EditLogInputStream> selectInputStreams(
-      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
-      boolean inProgressOk) throws IOException {
+  public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk,
+      boolean onlyDurableTxns) throws IOException {
 
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
     synchronized(journalSetLock) {
       Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
           "selectInputStreams() on closed FSEditLog");
-      selectInputStreams(streams, fromTxId, inProgressOk);
+      selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns);
     }
 
     try {

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java

@@ -334,10 +334,17 @@ public class FileJournalManager implements JournalManager {
     return ret;
   }
 
+  synchronized public void selectInputStreams(
+      Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) throws IOException {
+    selectInputStreams(streams, fromTxnId, inProgressOk, false);
+  }
+
   @Override
   synchronized public void selectInputStreams(
       Collection<EditLogInputStream> streams, long fromTxId,
-      boolean inProgressOk) throws IOException {
+      boolean inProgressOk, boolean onlyDurableTxns)
+      throws IOException {
     List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
     if (LOG.isDebugEnabled()) {
       LOG.debug(this + ": selecting input streams starting at " + fromTxId +

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java

@@ -262,10 +262,13 @@ public class JournalSet implements JournalManager {
    *                         may not be sorted-- this is up to the caller.
    * @param fromTxId         The transaction ID to start looking for streams at
    * @param inProgressOk     Should we consider unfinalized streams?
+   * @param onlyDurableTxns  Set to true if streams are bounded by the durable
+   *                         TxId. A durable TxId is the committed txid in QJM
+   *                         or the largest txid written into file in FJM
    */
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk) throws IOException {
+      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
     final PriorityQueue<EditLogInputStream> allStreams = 
         new PriorityQueue<EditLogInputStream>(64,
             EDIT_LOG_INPUT_STREAM_COMPARATOR);
@@ -275,7 +278,8 @@ public class JournalSet implements JournalManager {
         continue;
       }
       try {
-        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+        jas.getManager().selectInputStreams(allStreams, fromTxId,
+            inProgressOk, onlyDurableTxns);
       } catch (IOException ioe) {
         LOG.warn("Unable to determine input streams from " + jas.getManager() +
             ". Skipping.", ioe);
@@ -682,7 +686,8 @@ public class JournalSet implements JournalManager {
       // And then start looking from after that point
       curStartTxId = bestLog.getEndTxId() + 1;
     }
-    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
+    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs,
+        curStartTxId - 1);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Generated manifest for logs since " + fromTxId + ":"

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java

@@ -42,10 +42,14 @@ interface LogsPurgeable {
    * 
    * @param fromTxId the first transaction id we want to read
    * @param inProgressOk whether or not in-progress streams should be returned
+   * @param onlyDurableTxns whether or not streams should be bounded by durable
+   *                        TxId. A durable TxId is the committed txid in QJM
+   *                        or the largest txid written into file in FJM
    * @throws IOException if the underlying storage has an error or is otherwise
    * inaccessible
    */
   void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk) throws IOException;
+      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
+      throws IOException;
   
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java

@@ -134,7 +134,7 @@ public class NNStorageRetentionManager {
     long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
     
     ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
-    purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
+    purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
     Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
       @Override
       public int compare(EditLogInputStream a, EditLogInputStream b) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -914,7 +914,7 @@ public class SecondaryNameNode implements Runnable,
 
       @Override
       public void selectInputStreams(Collection<EditLogInputStream> streams,
-          long fromTxId, boolean inProgressOk) {
+          long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
         Iterator<StorageDirectory> iter = storage.dirIterator();
         while (iter.hasNext()) {
           StorageDirectory dir = iter.next();

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

@@ -135,6 +135,11 @@ public class EditLogTailer {
    */
   private int maxRetries;
 
+  /**
+   * Whether the tailer should tail the in-progress edit log segments.
+   */
+  private final boolean inProgressOk;
+
   public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
     this.tailerThread = new EditLogTailerThread();
     this.conf = conf;
@@ -182,6 +187,10 @@ public class EditLogTailer {
       maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT;
     }
 
+    inProgressOk = conf.getBoolean(
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
+
     nnCount = nns.size();
     // setup the iterator to endlessly loop the nns
     this.nnLookup = Iterators.cycle(nns);
@@ -263,7 +272,8 @@ public class EditLogTailer {
       }
       Collection<EditLogInputStream> streams;
       try {
-        streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
+        streams = editLog.selectInputStreams(lastTxnId + 1, 0,
+            null, inProgressOk, true);
       } catch (IOException ioe) {
         // This is acceptable. If we try to tail edits in the middle of an edits
         // log roll, i.e. the last one has been finalized but the new inprogress

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java

@@ -29,12 +29,15 @@ import com.google.common.base.Preconditions;
 public class RemoteEditLogManifest {
 
   private List<RemoteEditLog> logs;
-  
+
+  private long committedTxnId = -1;
+
   public RemoteEditLogManifest() {
   }
-  
-  public RemoteEditLogManifest(List<RemoteEditLog> logs) {
+
+  public RemoteEditLogManifest(List<RemoteEditLog> logs, long committedTxnId) {
     this.logs = logs;
+    this.committedTxnId = committedTxnId;
     checkState();
   }
   
@@ -46,7 +49,7 @@ public class RemoteEditLogManifest {
    */
   private void checkState()  {
     Preconditions.checkNotNull(logs);
-    
+
     RemoteEditLog prev = null;
     for (RemoteEditLog log : logs) {
       if (prev != null) {
@@ -56,7 +59,6 @@ public class RemoteEditLogManifest {
               + this);
         }
       }
-      
       prev = log;
     }
   }
@@ -65,10 +67,13 @@ public class RemoteEditLogManifest {
     return Collections.unmodifiableList(logs);
   }
 
+  public long getCommittedTxnId() {
+    return committedTxnId;
+  }
 
-  
   @Override
   public String toString() {
-    return "[" + Joiner.on(", ").join(logs) + "]";
+    return "[" + Joiner.on(", ").join(logs) + "]" + " CommittedTxId: "
+        + committedTxnId;
   }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto

@@ -92,6 +92,7 @@ message RemoteEditLogProto {
  */
 message RemoteEditLogManifestProto {
   repeated RemoteEditLogProto logs = 1;
+  required uint64 committedTxnId = 2;
 }
 
 /**

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2793,6 +2793,16 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.ha.tail-edits.in-progress</name>
+  <value>false</value>
+  <description>
+    Whether enable standby namenode to tail in-progress edit logs.
+    Clients might want to turn it on when they want Standby NN to have
+    more up-to-date data.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.lease-recheck-interval-ms</name>
   <value>2000</value>

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -308,7 +308,7 @@ public class TestPBHelper {
     List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
     logs.add(new RemoteEditLog(1, 10));
     logs.add(new RemoteEditLog(11, 20));
-    RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
+    RemoteEditLogManifest m = new RemoteEditLogManifest(logs, 20);
     RemoteEditLogManifestProto mProto = PBHelper.convert(m);
     RemoteEditLogManifest m1 = PBHelper.convert(mProto);
 

+ 155 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -44,6 +44,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -92,6 +93,10 @@ public class TestQuorumJournalManager {
     conf = new Configuration();
     // Don't retry connections - it just slows down the tests.
     conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    // Turn off IPC client caching to handle daemon restarts.
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     
     cluster = new MiniJournalCluster.Builder(conf)
         .baseDir(GenericTestUtils.getRandomizedTestDir().getAbsolutePath())
@@ -935,7 +940,156 @@ public class TestQuorumJournalManager {
     
     verifyEdits(streams, 25, 50);
   }
-  
+
+  @Test
+  public void testInProgressRecovery() throws Exception {
+    // Test the case when in-progress edit log tailing is on, and
+    // new active performs recovery when the old active crashes
+    // without closing the last log segment.
+    // See HDFS-13145 for more details.
+
+    // Write two batches of edits. After these, the commitId on the
+    // journals should be 5, and endTxnId should be 8.
+    EditLogOutputStream stm = qjm.startLogSegment(1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 5);
+    writeTxns(stm, 6, 3);
+
+    // Do recovery from a separate QJM, just like in failover.
+    QuorumJournalManager qjm2 = createSpyingQJM();
+    qjm2.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 8);
+
+    // When selecting input stream, we should see all txns up to 8.
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm2.selectInputStreams(streams, 1, true, true);
+    verifyEdits(streams, 1, 8);
+  }
+
+  @Test
+  public void testSelectViaRpcWithDurableTransactions() throws Exception {
+    // Two loggers will have up to ID 5, one will have up to ID 6
+    failLoggerAtTxn(spies.get(0), 6);
+    failLoggerAtTxn(spies.get(1), 6);
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 5);
+    try {
+      writeTxns(stm, 6, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcWithoutDurableTransactions() throws Exception {
+    setupLoggers345();
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcOneDeadJN() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+  }
+
+  @Test
+  public void testSelectViaRpcTwoDeadJNs() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+    cluster.getJournalNode(1).stopAndJoin(0);
+
+    try {
+      qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
+      fail("");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcTwoJNsError() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 1);
+
+    futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 11);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcAfterJNRestart() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    qjm.finalizeLogSegment(1, 10);
+
+    // Close to avoid connections hanging around after the JNs are restarted
+    for (int i = 0; i < cluster.getNumNodes(); i++) {
+      cluster.restartJournalNode(i);
+    }
+    cluster.waitActive();
+
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
   
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {

+ 98 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java

@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
@@ -28,11 +31,11 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -44,11 +47,15 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
 
 import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
 
 /**
  * True unit tests for QuorumJournalManager
@@ -215,6 +222,94 @@ public class TestQuorumJournalManagerUnit {
     Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
   }
 
+  @Test
+  public void testReadRpcInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, 3))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 3);
+  }
+
+  @Test
+  public void testReadRpcMismatchedInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 2);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneSlow() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    Mockito.doReturn(SettableFuture.create())
+        .when(spyLoggers.get(2)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneException() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    futureThrows(new IOException()).when(spyLoggers.get(2))
+        .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsNoNewEdits() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(GetJournaledEditsResponseProto.newBuilder()
+          .setTxnCount(0).setEditLog(ByteString.EMPTY).build())
+          .when(spyLoggers.get(jn))
+          .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(0, streams.size());
+  }
+
+  private GetJournaledEditsResponseProto getJournaledEditsReponse(
+      int startTxn, int numTxns) throws Exception {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+        new DataOutputStream(byteStream));
+    byteStream.write(createTxnData(startTxn, numTxns));
+    return GetJournaledEditsResponseProto.newBuilder()
+        .setTxnCount(numTxns)
+        .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
+        .build();
+  }
+
   private EditLogOutputStream createLogSegment() throws IOException {
     futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
         Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java

@@ -167,12 +167,12 @@ public class TestJournal {
     // Send txids 1-3, with a request indicating only 0 committed
     journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
         QJMTestUtil.createTxnData(1, 3));
-    assertEquals(0, journal.getCommittedTxnIdForTests());
+    assertEquals(0, journal.getCommittedTxnId());
     
     // Send 4-6, with request indicating that through 3 is committed.
     journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
         QJMTestUtil.createTxnData(4, 6));
-    assertEquals(3, journal.getCommittedTxnIdForTests());    
+    assertEquals(3, journal.getCommittedTxnId());
   }
   
   @Test (timeout = 10000)

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -1037,9 +1037,9 @@ public class TestEditLog {
         "[1,100]|[101,200]|[201,]");
     log = getFSEditLog(storage);
     log.initJournalsForWrite();
-    assertEquals("[[1,100], [101,200]]",
+    assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
         log.getEditLogManifest(1).toString());
-    assertEquals("[[101,200]]",
+    assertEquals("[[101,200]] CommittedTxId: 200",
         log.getEditLogManifest(101).toString());
 
     // Another simple case, different directories have different
@@ -1049,8 +1049,8 @@ public class TestEditLog {
         "[1,100]|[201,300]|[301,400]"); // nothing starting at 101
     log = getFSEditLog(storage);
     log.initJournalsForWrite();
-    assertEquals("[[1,100], [101,200], [201,300], [301,400]]",
-        log.getEditLogManifest(1).toString());
+    assertEquals("[[1,100], [101,200], [201,300], [301,400]]" +
+            " CommittedTxId: 400", log.getEditLogManifest(1).toString());
     
     // Case where one directory has an earlier finalized log, followed
     // by a gap. The returned manifest should start after the gap.
@@ -1059,7 +1059,7 @@ public class TestEditLog {
         "[301,400]|[401,500]");
     log = getFSEditLog(storage);
     log.initJournalsForWrite();
-    assertEquals("[[301,400], [401,500]]",
+    assertEquals("[[301,400], [401,500]] CommittedTxId: 500",
         log.getEditLogManifest(1).toString());
     
     // Case where different directories have different length logs
@@ -1069,9 +1069,9 @@ public class TestEditLog {
         "[1,50]|[101,200]"); // short log at 1
     log = getFSEditLog(storage);
     log.initJournalsForWrite();
-    assertEquals("[[1,100], [101,200]]",
+    assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
         log.getEditLogManifest(1).toString());
-    assertEquals("[[101,200]]",
+    assertEquals("[[101,200]] CommittedTxId: 200",
         log.getEditLogManifest(101).toString());
 
     // Case where the first storage has an inprogress while
@@ -1082,9 +1082,9 @@ public class TestEditLog {
         "[1,100]|[101,200]"); 
     log = getFSEditLog(storage);
     log.initJournalsForWrite();
-    assertEquals("[[1,100], [101,200]]",
+    assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
         log.getEditLogManifest(1).toString());
-    assertEquals("[[101,200]]",
+    assertEquals("[[101,200]] CommittedTxId: 200",
         log.getEditLogManifest(101).toString());
   }
   

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java

@@ -33,6 +33,7 @@ import java.util.EnumMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -79,6 +80,23 @@ public class TestEditLogFileInputStream {
     elis.close();
   }
 
+  @Test
+  public void testByteStringLog() throws Exception {
+    ByteString bytes = ByteString.copyFrom(FAKE_LOG_DATA);
+    EditLogInputStream elis = EditLogFileInputStream.fromByteString(bytes,
+        HdfsServerConstants.INVALID_TXID, HdfsServerConstants.INVALID_TXID,
+        true);
+    // Read the edit log and verify that all of the data is present
+    EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = FSImageTestUtil
+        .countEditLogOpTypes(elis);
+    assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
+
+    assertEquals(FAKE_LOG_DATA.length, elis.length());
+    elis.close();
+  }
+
   /**
    * Regression test for HDFS-8965 which verifies that
    * FSEditLogFileInputStream#scanOp verifies Op checksums.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java

@@ -399,7 +399,7 @@ public class TestFileJournalManager {
     FileJournalManager.matchEditLogs(badDir);
   }
   
-  private static EditLogInputStream getJournalInputStream(JournalManager jm,
+  private static EditLogInputStream getJournalInputStream(FileJournalManager jm,
       long txId, boolean inProgressOk) throws IOException {
     final PriorityQueue<EditLogInputStream> allStreams = 
         new PriorityQueue<EditLogInputStream>(64,

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java

@@ -176,7 +176,7 @@ public class TestGenericJournalConf {
 
     @Override
     public void selectInputStreams(Collection<EditLogInputStream> streams,
-        long fromTxnId, boolean inProgressOk) {
+        long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) {
     }
 
     @Override

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java

@@ -368,11 +368,11 @@ public class TestNNStorageRetentionManager {
         public Void answer(InvocationOnMock invocation) throws Throwable {
           Object[] args = invocation.getArguments();
           journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
-              (Long)args[1], (Boolean)args[2]);
+              (Long)args[1], (Boolean)args[2], (Boolean)args[3]);
           return null;
         }
       }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
-          Mockito.anyLong(), Mockito.anyBoolean());
+          Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
       return mockLog;
     }
   }

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java

@@ -198,7 +198,7 @@ public class TestFailureToReadEdits {
     
     // This op should get applied just fine.
     assertTrue(fs.mkdirs(new Path(TEST_DIR2)));
-    
+
     // This is the op the mocking will cause to fail to be read.
     assertTrue(fs.mkdirs(new Path(TEST_DIR3)));
     
@@ -220,7 +220,7 @@ public class TestFailureToReadEdits {
     // Null because it hasn't been created yet.
     assertNull(NameNodeAdapter.getFileInfo(nn1,
         TEST_DIR3, false));
-    
+
     // Now let the standby read ALL the edits.
     answer.setThrowExceptionOnRead(false);
     HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
@@ -340,7 +340,8 @@ public class TestFailureToReadEdits {
     FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
     LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); 
     doAnswer(answer).when(spyEditLog).selectInputStreams(
-        anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
+        anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean(),
+        anyBoolean());
     return answer;
   }
   

+ 339 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java

@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * Test cases for in progress tailing edit logs by
+ * the standby node.
+ */
+public class TestStandbyInProgressTail {
+  private static final Log LOG =
+          LogFactory.getLog(TestStandbyInProgressTail.class);
+  private Configuration conf;
+  private MiniQJMHACluster qjmhaCluster;
+  private MiniDFSCluster cluster;
+  private NameNode nn0;
+  private NameNode nn1;
+
+  @Before
+  public void startUp() throws IOException {
+    conf = new Configuration();
+    // Set period of tail edits to a large value (20 mins) for test purposes
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    HAUtil.setAllowStandbyReads(conf, true);
+    qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+    cluster = qjmhaCluster.getDfsCluster();
+
+    // Get NameNode from cluster to future manual control
+    nn0 = cluster.getNameNode(0);
+    nn1 = cluster.getNameNode(1);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDefault() throws Exception {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+    conf = new Configuration();
+    // Set period of tail edits to a large value (20 mins) for test purposes
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
+    HAUtil.setAllowStandbyReads(conf, true);
+    qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+    cluster = qjmhaCluster.getDfsCluster();
+
+    try {
+      // During HA startup, both nodes should be in
+      // standby and we shouldn't have any edits files
+      // in any edits directory!
+      List<URI> allDirs = Lists.newArrayList();
+      allDirs.addAll(cluster.getNameDirs(0));
+      allDirs.addAll(cluster.getNameDirs(1));
+      assertNoEditFiles(allDirs);
+
+      // Set the first NN to active, make sure it creates edits
+      // in its own dirs and the shared dir. The standby
+      // should still have no edits!
+      cluster.transitionToActive(0);
+
+      assertEditFiles(cluster.getNameDirs(0),
+              NNStorage.getInProgressEditsFileName(1));
+      assertNoEditFiles(cluster.getNameDirs(1));
+
+      cluster.getNameNode(0).getRpcServer().mkdirs("/test",
+              FsPermission.createImmutable((short) 0755), true);
+
+      cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
+
+      // StandbyNameNode should not finish tailing in-progress logs
+      assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+              "/test", true));
+
+      // Restarting the standby should not finalize any edits files
+      // in the shared directory when it starts up!
+      cluster.restartNameNode(1);
+
+      assertEditFiles(cluster.getNameDirs(0),
+              NNStorage.getInProgressEditsFileName(1));
+      assertNoEditFiles(cluster.getNameDirs(1));
+
+      // Additionally it should not have applied any in-progress logs
+      // at start-up -- otherwise, it would have read half-way into
+      // the current log segment, and on the next roll, it would have to
+      // either replay starting in the middle of the segment (not allowed)
+      // or double-replay the edits (incorrect).
+      assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+              "/test", true));
+
+      cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
+              FsPermission.createImmutable((short) 0755), true);
+
+      // If we restart NN0, it'll come back as standby, and we can
+      // transition NN1 to active and make sure it reads edits correctly.
+      cluster.restartNameNode(0);
+      cluster.transitionToActive(1);
+
+      // NN1 should have both the edits that came before its restart,
+      // and the edits that came after its restart.
+      assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+              "/test", true));
+      assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+              "/test2", true));
+    } finally {
+      if (qjmhaCluster != null) {
+        qjmhaCluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testSetup() throws Exception {
+    // During HA startup, both nodes should be in
+    // standby and we shouldn't have any edits files
+    // in any edits directory!
+    List<URI> allDirs = Lists.newArrayList();
+    allDirs.addAll(cluster.getNameDirs(0));
+    allDirs.addAll(cluster.getNameDirs(1));
+    assertNoEditFiles(allDirs);
+
+    // Set the first NN to active, make sure it creates edits
+    // in its own dirs and the shared dir. The standby
+    // should still have no edits!
+    cluster.transitionToActive(0);
+
+    assertEditFiles(cluster.getNameDirs(0),
+            NNStorage.getInProgressEditsFileName(1));
+    assertNoEditFiles(cluster.getNameDirs(1));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test",
+            FsPermission.createImmutable((short) 0755), true);
+
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+
+    // After waiting for 5 seconds, StandbyNameNode should finish tailing
+    // in-progress logs
+    assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+            "/test", true));
+
+    // Restarting the standby should not finalize any edits files
+    // in the shared directory when it starts up!
+    cluster.restartNameNode(1);
+
+    assertEditFiles(cluster.getNameDirs(0),
+            NNStorage.getInProgressEditsFileName(1));
+    assertNoEditFiles(cluster.getNameDirs(1));
+
+    // Because we're using in-progress tailer, this should not be null
+    assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+            "/test", true));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
+            FsPermission.createImmutable((short) 0755), true);
+
+    // If we restart NN0, it'll come back as standby, and we can
+    // transition NN1 to active and make sure it reads edits correctly.
+    cluster.restartNameNode(0);
+    cluster.transitionToActive(1);
+
+    // NN1 should have both the edits that came before its restart,
+    // and the edits that came after its restart.
+    assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+            "/test", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
+            "/test2", true));
+  }
+
+  @Test
+  public void testHalfStartInProgressTail() throws Exception {
+    // Set the first NN to active, make sure it creates edits
+    // in its own dirs and the shared dir. The standby
+    // should still have no edits!
+    cluster.transitionToActive(0);
+
+    assertEditFiles(cluster.getNameDirs(0),
+            NNStorage.getInProgressEditsFileName(1));
+    assertNoEditFiles(cluster.getNameDirs(1));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test",
+            FsPermission.createImmutable((short) 0755), true);
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+
+    // StandbyNameNode should tail the in-progress edit
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+
+    // Create a new edit and finalized it
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
+            FsPermission.createImmutable((short) 0755), true);
+    nn0.getRpcServer().rollEditLog();
+
+    // StandbyNameNode shouldn't tail the edit since we do not call the method
+    assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+
+    // Create a new in-progress edit and let SBNN do the tail
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
+            FsPermission.createImmutable((short) 0755), true);
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+
+    // StandbyNameNode should tail the finalized edit and the new in-progress
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
+  }
+
+  @Test
+  public void testInitStartInProgressTail() throws Exception {
+    // Set the first NN to active, make sure it creates edits
+    // in its own dirs and the shared dir. The standby
+    // should still have no edits!
+    cluster.transitionToActive(0);
+
+    assertEditFiles(cluster.getNameDirs(0),
+            NNStorage.getInProgressEditsFileName(1));
+    assertNoEditFiles(cluster.getNameDirs(1));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test",
+            FsPermission.createImmutable((short) 0755), true);
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
+            FsPermission.createImmutable((short) 0755), true);
+    nn0.getRpcServer().rollEditLog();
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
+            FsPermission.createImmutable((short) 0755), true);
+
+    assertNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+    assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+    assertNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
+
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+
+    // StandbyNameNode shoudl tail the finalized edit and the new in-progress
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
+  }
+
+  @Test
+  public void testNewStartInProgressTail() throws Exception {
+    cluster.transitionToActive(0);
+
+    assertEditFiles(cluster.getNameDirs(0),
+            NNStorage.getInProgressEditsFileName(1));
+    assertNoEditFiles(cluster.getNameDirs(1));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test",
+            FsPermission.createImmutable((short) 0755), true);
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
+            FsPermission.createImmutable((short) 0755), true);
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+    nn0.getRpcServer().rollEditLog();
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+
+    cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
+            FsPermission.createImmutable((short) 0755), true);
+    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+
+    // StandbyNameNode shoudl tail the finalized edit and the new in-progress
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
+    assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
+  }
+
+  /**
+   * Check that no edits files are present in the given storage dirs.
+   */
+  private static void assertNoEditFiles(Iterable<URI> dirs) throws IOException {
+    assertEditFiles(dirs);
+  }
+
+  /**
+   * Check that the given list of edits files are present in the given storage
+   * dirs.
+   */
+  private static void assertEditFiles(Iterable<URI> dirs, String... files)
+          throws IOException {
+    for (URI u : dirs) {
+      File editDirRoot = new File(u.getPath());
+      File editDir = new File(editDirRoot, "current");
+      GenericTestUtils.assertExists(editDir);
+      if (files.length == 0) {
+        LOG.info("Checking no edit files exist in " + editDir);
+      } else {
+        LOG.info("Checking for following edit files in " + editDir
+                + ": " + Joiner.on(",").join(files));
+      }
+
+      GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
+    }
+  }
+}